Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 976B6200C48 for ; Thu, 6 Apr 2017 15:31:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9639E160BB1; Thu, 6 Apr 2017 13:31:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D0C54160BAF for ; Thu, 6 Apr 2017 15:31:13 +0200 (CEST) Received: (qmail 95762 invoked by uid 500); 6 Apr 2017 13:31:13 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 94838 invoked by uid 99); 6 Apr 2017 13:31:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Apr 2017 13:31:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3ED34E9813; Thu, 6 Apr 2017 13:31:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 06 Apr 2017 13:31:23 -0000 Message-Id: <3f468b608c664818814d2a25b8f3cf4c@git.apache.org> In-Reply-To: <358816612602444bb6656810c8375230@git.apache.org> References: <358816612602444bb6656810c8375230@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/50] [abbrv] ignite git commit: IGNITE-3575 CPP: Added support for continuous queries remote filters. archived-at: Thu, 06 Apr 2017 13:31:15 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp index dfef8e4..858ee77 100644 --- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp @@ -17,8 +17,11 @@ #include -#include "ignite/impl/cache/cache_impl.h" -#include "ignite/impl/binary/binary_type_updater_impl.h" +#include +#include +#include + +#include using namespace ignite::common::concurrent; using namespace ignite::jni::java; @@ -381,14 +384,93 @@ namespace ignite IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); } - struct DummyQry { void Write(BinaryRawWriter&) const { }}; + struct Dummy + { + void Write(BinaryRawWriter&) const + { + // No-op. + } + }; ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer qry, IgniteError& err) { - DummyQry dummy; + Dummy dummy; return QueryContinuous(qry, dummy, -1, OP_QRY_CONTINUOUS, err); } + + template + QueryCursorImpl* CacheImpl::QueryInternal(const T& qry, int32_t typ, IgniteError& err) + { + JniErrorInfo jniErr; + + SharedPointer mem = GetEnvironment().AllocateMemory(); + InteropMemory* mem0 = mem.Get(); + InteropOutputStream out(mem0); + BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); + BinaryRawWriter rawWriter(&writer); + + qry.Write(rawWriter); + + out.Synchronize(); + + jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpQueryCursor(GetTarget(), + typ, mem.Get()->PointerLong(), &jniErr); + + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) + return new QueryCursorImpl(GetEnvironmentPointer(), qryJavaRef); + + return 0; + } + + template + ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer qry, + const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err) + { + JniErrorInfo jniErr; + + SharedPointer mem = GetEnvironment().AllocateMemory(); + InteropMemory* mem0 = mem.Get(); + InteropOutputStream out(mem0); + BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); + BinaryRawWriter rawWriter(&writer); + + const ContinuousQueryImplBase& qry0 = *qry.Get(); + + int64_t handle = GetEnvironment().GetHandleRegistry().Allocate(qry); + + rawWriter.WriteInt64(handle); + rawWriter.WriteBool(qry0.GetLocal()); + + event::CacheEntryEventFilterHolderBase& filterOp = qry0.GetFilterHolder(); + + filterOp.Write(writer); + + rawWriter.WriteInt32(qry0.GetBufferSize()); + rawWriter.WriteInt64(qry0.GetTimeInterval()); + + // Autounsubscribe is a filter feature. + rawWriter.WriteBool(false); + + // Writing initial query. When there is not initial query writing -1. + rawWriter.WriteInt32(typ); + if (typ != -1) + initialQry.Write(rawWriter); + + out.Synchronize(); + + jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpContinuousQuery(GetTarget(), + cmd, mem.Get()->PointerLong(), &jniErr); + + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) + return new ContinuousQueryHandleImpl(GetEnvironmentPointer(), handle, qryJavaRef); + + return 0; + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp index b2fa1fd..b15183b 100644 --- a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp @@ -84,11 +84,6 @@ namespace ignite return new QueryCursorImpl(env, res); } - - void ContinuousQueryHandleImpl::SetQuery(SP_ContinuousQueryImplBase query) - { - qry = query; - } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp new file mode 100644 index 0000000..2e09de2 --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +using namespace ignite::common::concurrent; + +namespace ignite +{ + namespace impl + { + IgniteBindingImpl::IgniteBindingImpl(IgniteEnvironment &env) : + env(env), + callbacks() + { + // No-op. + } + + int64_t IgniteBindingImpl::InvokeCallback(bool& found, int32_t type, int32_t id, + binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl& writer) + { + int64_t key = makeKey(type, id); + + CsLockGuard guard(lock); + + std::map::iterator it = callbacks.find(key); + + found = it != callbacks.end(); + + if (found) + { + Callback* callback = it->second; + + // We have found callback and does not need lock here anymore. + guard.Reset(); + + return callback(reader, writer, env); + } + + return 0; + } + + void IgniteBindingImpl::RegisterCallback(int32_t type, int32_t id, Callback* proc, IgniteError& err) + { + int64_t key = makeKey(type, id); + + CsLockGuard guard(lock); + + bool inserted = callbacks.insert(std::make_pair(key, proc)).second; + + guard.Reset(); + + if (!inserted) + { + std::stringstream builder; + + builder << "Trying to register multiple PRC callbacks with the same ID. [type=" + << type << ", id=" << id << ']'; + + err = IgniteError(IgniteError::IGNITE_ERR_ENTRY_PROCESSOR, builder.str().c_str()); + } + } + + void IgniteBindingImpl::RegisterCallback(int32_t type, int32_t id, Callback* callback) + { + IgniteError err; + + RegisterCallback(type, id, callback, err); + + IgniteError::ThrowIfNeeded(err); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_environment.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp index b37fa8f..4e2a1f2 100644 --- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp +++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp @@ -15,14 +15,18 @@ * limitations under the License. */ -#include "ignite/impl/interop/interop_external_memory.h" -#include "ignite/impl/binary/binary_reader_impl.h" -#include "ignite/impl/ignite_environment.h" -#include "ignite/cache/query/continuous/continuous_query.h" -#include "ignite/binary/binary.h" -#include "ignite/impl/binary/binary_type_updater_impl.h" -#include "ignite/impl/module_manager.h" -#include "ignite/ignite_binding.h" +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include using namespace ignite::common::concurrent; using namespace ignite::jni::java; @@ -42,6 +46,8 @@ namespace ignite { CACHE_INVOKE = 8, CONTINUOUS_QUERY_LISTENER_APPLY = 18, + CONTINUOUS_QUERY_FILTER_CREATE = 19, + CONTINUOUS_QUERY_FILTER_APPLY = 20, CONTINUOUS_QUERY_FILTER_RELEASE = 21, REALLOC = 36, ON_START = 49, @@ -57,6 +63,7 @@ namespace ignite */ long long IGNITE_CALL InLongOutLong(void* target, int type, long long val) { + int64_t res = 0; SharedPointer* env = static_cast*>(target); switch (type) @@ -77,6 +84,24 @@ namespace ignite break; } + case CONTINUOUS_QUERY_FILTER_CREATE: + { + SharedPointer mem = env->Get()->GetMemory(val); + + res = env->Get()->OnContinuousQueryFilterCreate(mem); + + break; + } + + case CONTINUOUS_QUERY_FILTER_APPLY: + { + SharedPointer mem = env->Get()->GetMemory(val); + + res = env->Get()->OnContinuousQueryFilterApply(mem); + + break; + } + case CONTINUOUS_QUERY_FILTER_RELEASE: { // No-op. @@ -98,7 +123,7 @@ namespace ignite } } - return 0; + return res; } /** @@ -152,10 +177,14 @@ namespace ignite registry(DEFAULT_FAST_PATH_CONTAINERS_CAP, DEFAULT_SLOW_PATH_CONTAINERS_CAP), metaMgr(new BinaryTypeManager()), metaUpdater(0), - binding(new IgniteBindingImpl()), - moduleMgr(new ModuleManager(GetBindingContext())) + binding(), + moduleMgr() { - // No-op. + binding = SharedPointer(new IgniteBindingImpl(*this)); + + IgniteBindingContext bindingContext(cfg, GetBinding()); + + moduleMgr = SharedPointer(new ModuleManager(bindingContext)); } IgniteEnvironment::~IgniteEnvironment() @@ -263,14 +292,9 @@ namespace ignite return metaUpdater; } - IgniteBinding IgniteEnvironment::GetBinding() const - { - return IgniteBinding(binding); - } - - IgniteBindingContext IgniteEnvironment::GetBindingContext() const + SharedPointer IgniteEnvironment::GetBinding() const { - return IgniteBindingContext(*cfg, GetBinding()); + return binding; } void IgniteEnvironment::ProcessorReleaseStart() @@ -321,6 +345,62 @@ namespace ignite } } + int64_t IgniteEnvironment::OnContinuousQueryFilterCreate(SharedPointer& mem) + { + if (!binding.Get()) + throw IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "IgniteBinding is not initialized."); + + InteropInputStream inStream(mem.Get()); + BinaryReaderImpl reader(&inStream); + + InteropOutputStream outStream(mem.Get()); + BinaryWriterImpl writer(&outStream, GetTypeManager()); + + BinaryObjectImpl binFilter = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position()); + + int32_t filterId = binFilter.GetTypeId(); + + bool invoked = false; + + int64_t res = binding.Get()->InvokeCallback(invoked, + IgniteBindingImpl::CACHE_ENTRY_FILTER_CREATE, filterId, reader, writer); + + if (!invoked) + { + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION, + "C++ remote filter is not registered on the node (did you compile your program without -rdynamic?).", + "filterId", filterId); + } + + outStream.Synchronize(); + + return res; + } + + int64_t IgniteEnvironment::OnContinuousQueryFilterApply(SharedPointer& mem) + { + InteropInputStream inStream(mem.Get()); + BinaryReaderImpl reader(&inStream); + BinaryRawReader rawReader(&reader); + + int64_t handle = rawReader.ReadInt64(); + + SharedPointer qry = + StaticPointerCast(registry.Get(handle)); + + if (!qry.Get()) + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Null query for handle.", "handle", handle); + + cache::event::CacheEntryEventFilterBase* filter = qry.Get()->GetFilterHolder().GetFilter(); + + if (!filter) + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Null filter for handle.", "handle", handle); + + bool res = filter->ReadAndProcessEvent(rawReader); + + return res ? 1 : 0; + } + void IgniteEnvironment::CacheInvokeCallback(SharedPointer& mem) { if (!binding.Get()) @@ -340,9 +420,11 @@ namespace ignite BinaryObjectImpl binProcHolder = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position(), 0); BinaryObjectImpl binProc = binProcHolder.GetField(0); - int64_t procId = binProc.GetTypeId(); + int32_t procId = binProc.GetTypeId(); + + bool invoked = false; - bool invoked = binding.Get()->InvokeCallbackById(procId, reader, writer); + binding.Get()->InvokeCallback(invoked, IgniteBindingImpl::CACHE_ENTRY_PROCESSOR_APPLY, procId, reader, writer); if (!invoked) { http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp index fd9bf45..546cd01 100644 --- a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp @@ -59,7 +59,7 @@ namespace ignite return env.Get()->Context(); } - IgniteBinding IgniteImpl::GetBinding() + SharedPointer IgniteImpl::GetBinding() { return env.Get()->GetBinding(); }