Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3CCD7200B8C for ; Mon, 12 Sep 2016 13:18:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3B68B160AB8; Mon, 12 Sep 2016 11:18:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 12957160AD9 for ; Mon, 12 Sep 2016 13:18:41 +0200 (CEST) Received: (qmail 17527 invoked by uid 500); 12 Sep 2016 11:18:41 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 17436 invoked by uid 99); 12 Sep 2016 11:18:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Sep 2016 11:18:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 035B6E08BA; Mon, 12 Sep 2016 11:18:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 12 Sep 2016 11:18:46 -0000 Message-Id: In-Reply-To: <193da25950224934813b23bf625df204@git.apache.org> References: <193da25950224934813b23bf625df204@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/49] ignite git commit: IGNITE-2946: CPP: Optimized GetNext() method for cursors. This closes #992. archived-at: Mon, 12 Sep 2016 11:18:45 -0000 IGNITE-2946: CPP: Optimized GetNext() method for cursors. This closes #992. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e9c797fd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e9c797fd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e9c797fd Branch: refs/heads/ignite-comm-opts1 Commit: e9c797fd964727882ad6f40f2a452b17ae7c857e Parents: e3c4868 Author: isapego Authored: Sun Sep 4 16:47:40 2016 +0300 Committer: thatcoach Committed: Sun Sep 4 16:47:40 2016 +0300 ---------------------------------------------------------------------- .../query/PlatformAbstractQueryCursor.java | 11 +- .../cache/query/PlatformFieldsQueryCursor.java | 6 + .../ignite/impl/binary/binary_reader_impl.h | 2 +- .../common/include/ignite/common/concurrent.h | 5 +- .../cpp/core-test/src/cache_query_test.cpp | 243 +++++++++++++------ modules/platforms/cpp/core/Makefile.am | 1 + modules/platforms/cpp/core/include/Makefile.am | 1 + .../include/ignite/cache/query/query_cursor.h | 6 +- .../ignite/cache/query/query_fields_cursor.h | 4 +- .../ignite/impl/cache/query/query_batch.h | 148 +++++++++++ .../impl/cache/query/query_fields_row_impl.h | 30 +-- .../ignite/impl/cache/query/query_impl.h | 30 ++- .../platforms/cpp/core/project/vs/core.vcxproj | 2 + .../cpp/core/project/vs/core.vcxproj.filters | 6 + .../core/src/impl/cache/query/query_batch.cpp | 52 ++++ .../core/src/impl/cache/query/query_impl.cpp | 180 ++++++++------ .../Impl/Cache/Query/FieldsQueryCursor.cs | 3 + 17 files changed, 537 insertions(+), 193 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java index 7422757..ab52b52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java @@ -70,13 +70,12 @@ public abstract class PlatformAbstractQueryCursor extends PlatformAbstractTar try { int cntPos = writer.reserveInt(); - int cnt; + int cnt = 0; - for (cnt = 0; cnt < batchSize; cnt++) { - if (iter.hasNext()) - write(writer, iter.next()); - else - break; + while (cnt < batchSize && iter.hasNext()) { + write(writer, iter.next()); + + cnt++; } writer.writeInt(cntPos, cnt); http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java index a4cdae6..25f86f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java @@ -41,9 +41,15 @@ public class PlatformFieldsQueryCursor extends PlatformAbstractQueryCursorIncrement(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core-test/src/cache_query_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/src/cache_query_test.cpp b/modules/platforms/cpp/core-test/src/cache_query_test.cpp index 168f3f9..b8cd612 100644 --- a/modules/platforms/cpp/core-test/src/cache_query_test.cpp +++ b/modules/platforms/cpp/core-test/src/cache_query_test.cpp @@ -212,63 +212,6 @@ namespace ignite } } -/** Node started during the test. */ -Ignite grid = Ignite(); - -/** Cache accessor. */ -Cache GetCache() -{ - return grid.GetCache("cache"); -} - -/** - * Test setup fixture. - */ -struct CacheQueryTestSuiteFixture { - /** - * Constructor. - */ - CacheQueryTestSuiteFixture() - { - IgniteConfiguration cfg; - - cfg.jvmOpts.push_back("-Xdebug"); - cfg.jvmOpts.push_back("-Xnoagent"); - cfg.jvmOpts.push_back("-Djava.compiler=NONE"); - cfg.jvmOpts.push_back("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"); - cfg.jvmOpts.push_back("-XX:+HeapDumpOnOutOfMemoryError"); - -#ifdef IGNITE_TESTS_32 - cfg.jvmInitMem = 256; - cfg.jvmMaxMem = 768; -#else - cfg.jvmInitMem = 1024; - cfg.jvmMaxMem = 4096; -#endif - - char* cfgPath = getenv("IGNITE_NATIVE_TEST_CPP_CONFIG_PATH"); - - cfg.springCfgPath = std::string(cfgPath).append("/").append("cache-query.xml"); - - IgniteError err; - - Ignite grid0 = Ignition::Start(cfg, &err); - - if (err.GetCode() != IgniteError::IGNITE_SUCCESS) - BOOST_ERROR(err.GetText()); - - grid = grid0; - } - - /** - * Destructor. - */ - ~CacheQueryTestSuiteFixture() - { - Ignition::Stop(grid.GetName(), true); - } -}; - /** * Ensure that HasNext() fails. * @@ -522,6 +465,131 @@ void CheckMultipleGetAll(Cursor& cur, int key1, const std::string& name1, } } +/** + * Test setup fixture. + */ +struct CacheQueryTestSuiteFixture +{ + Ignite StartNode(const char* name) + { + IgniteConfiguration cfg; + + cfg.jvmOpts.push_back("-Xdebug"); + cfg.jvmOpts.push_back("-Xnoagent"); + cfg.jvmOpts.push_back("-Djava.compiler=NONE"); + cfg.jvmOpts.push_back("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"); + cfg.jvmOpts.push_back("-XX:+HeapDumpOnOutOfMemoryError"); + +#ifdef IGNITE_TESTS_32 + cfg.jvmInitMem = 256; + cfg.jvmMaxMem = 768; +#else + cfg.jvmInitMem = 1024; + cfg.jvmMaxMem = 4096; +#endif + + cfg.springCfgPath.assign(getenv("IGNITE_NATIVE_TEST_CPP_CONFIG_PATH")).append("/cache-query.xml"); + + IgniteError err; + + Ignite grid0 = Ignition::Start(cfg, name, &err); + + if (err.GetCode() != IgniteError::IGNITE_SUCCESS) + BOOST_ERROR(err.GetText()); + + return grid0; + } + + void CheckFieldsQueryPages(int32_t pageSize, int32_t pagesNum, int32_t additionalNum) + { + // Test simple query. + Cache cache = GetPersonCache(); + + // Test query with two fields of different type. + SqlFieldsQuery qry("select name, age from QueryPerson"); + + QueryFieldsCursor cursor = cache.Query(qry); + CheckEmpty(cursor); + + const int32_t entryCnt = pageSize * pagesNum + additionalNum; // Number of entries. + + qry.SetPageSize(pageSize); + + for (int i = 0; i < entryCnt; i++) + { + std::stringstream stream; + + stream << "A" << i; + + cache.Put(i, QueryPerson(stream.str(), i * 10, BinaryUtils::MakeDateLocal(1970 + i), + BinaryUtils::MakeTimestampLocal(2016, 1, 1, i / 60, i % 60))); + } + + cursor = cache.Query(qry); + + IgniteError error; + + for (int i = 0; i < entryCnt; i++) + { + std::stringstream stream; + + stream << "A" << i; + + std::string expected_name = stream.str(); + int expected_age = i * 10; + + BOOST_REQUIRE(cursor.HasNext(error)); + BOOST_REQUIRE(error.GetCode() == IgniteError::IGNITE_SUCCESS); + + QueryFieldsRow row = cursor.GetNext(error); + BOOST_REQUIRE(error.GetCode() == IgniteError::IGNITE_SUCCESS); + + BOOST_REQUIRE(row.HasNext(error)); + BOOST_REQUIRE(error.GetCode() == IgniteError::IGNITE_SUCCESS); + + std::string name = row.GetNext(error); + BOOST_REQUIRE(error.GetCode() == IgniteError::IGNITE_SUCCESS); + + BOOST_REQUIRE(name == expected_name); + + int age = row.GetNext(error); + BOOST_REQUIRE(error.GetCode() == IgniteError::IGNITE_SUCCESS); + + BOOST_REQUIRE(age == expected_age); + + BOOST_REQUIRE(!row.HasNext()); + } + + CheckEmpty(cursor); + } + + /** + * Constructor. + */ + CacheQueryTestSuiteFixture() : + grid(StartNode("Node1")) + { + // No-op. + } + + /** + * Destructor. + */ + ~CacheQueryTestSuiteFixture() + { + Ignition::StopAll(true); + } + + /** Person cache accessor. */ + Cache GetPersonCache() + { + return grid.GetCache("cache"); + } + + /** Node started during the test. */ + Ignite grid; +}; + BOOST_FIXTURE_TEST_SUITE(CacheQueryTestSuite, CacheQueryTestSuiteFixture) /** @@ -529,7 +597,7 @@ BOOST_FIXTURE_TEST_SUITE(CacheQueryTestSuite, CacheQueryTestSuiteFixture) */ BOOST_AUTO_TEST_CASE(TestSqlQuery) { - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with no results. SqlQuery qry("QueryPerson", "age < 20"); @@ -585,7 +653,7 @@ BOOST_AUTO_TEST_CASE(TestSqlQuery) */ BOOST_AUTO_TEST_CASE(TestTextQuery) { - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with no results. TextQuery qry("QueryPerson", "A1"); @@ -631,7 +699,7 @@ BOOST_AUTO_TEST_CASE(TestTextQuery) BOOST_AUTO_TEST_CASE(TestScanQuery) { // Test simple query. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with no results. ScanQuery qry; @@ -667,7 +735,7 @@ BOOST_AUTO_TEST_CASE(TestScanQuery) BOOST_AUTO_TEST_CASE(TestScanQueryPartitioned) { // Populate cache with data. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); int32_t partCnt = 256; // Defined in configuration explicitly. int32_t entryCnt = 1000; // Should be greater than partCnt. @@ -716,7 +784,7 @@ BOOST_AUTO_TEST_CASE(TestScanQueryPartitioned) BOOST_AUTO_TEST_CASE(TestFieldsQuerySingle) { // Test simple query. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with two fields of different type. SqlFieldsQuery qry("select age, name from QueryPerson"); @@ -761,7 +829,7 @@ BOOST_AUTO_TEST_CASE(TestFieldsQuerySingle) BOOST_AUTO_TEST_CASE(TestFieldsQueryExceptions) { // Test simple query. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with two fields of different type. SqlFieldsQuery qry("select age, name from QueryPerson"); @@ -806,7 +874,7 @@ BOOST_AUTO_TEST_CASE(TestFieldsQueryExceptions) BOOST_AUTO_TEST_CASE(TestFieldsQueryTwo) { // Test simple query. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with two fields of different type. SqlFieldsQuery qry("select age, name from QueryPerson"); @@ -869,7 +937,7 @@ BOOST_AUTO_TEST_CASE(TestFieldsQueryTwo) BOOST_AUTO_TEST_CASE(TestFieldsQuerySeveral) { // Test simple query. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with two fields of different type. SqlFieldsQuery qry("select name, age from QueryPerson"); @@ -935,7 +1003,7 @@ BOOST_AUTO_TEST_CASE(TestFieldsQuerySeveral) BOOST_AUTO_TEST_CASE(TestFieldsQueryDateLess) { // Test simple query. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with field of type 'Date'. SqlFieldsQuery qry("select birthday from QueryPerson where birthday<'1990-01-01'"); @@ -996,7 +1064,7 @@ BOOST_AUTO_TEST_CASE(TestFieldsQueryDateLess) BOOST_AUTO_TEST_CASE(TestFieldsQueryDateMore) { // Test simple query. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with field of type 'Date'. SqlFieldsQuery qry("select birthday from QueryPerson where birthday>'2070-01-01'"); @@ -1057,7 +1125,7 @@ BOOST_AUTO_TEST_CASE(TestFieldsQueryDateMore) BOOST_AUTO_TEST_CASE(TestFieldsQueryDateEqual) { // Test simple query. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with field of type 'Date'. SqlFieldsQuery qry("select birthday from QueryPerson where birthday='2032-01-01'"); @@ -1109,7 +1177,7 @@ BOOST_AUTO_TEST_CASE(TestFieldsQueryDateEqual) BOOST_AUTO_TEST_CASE(TestFieldsQueryTimestampLess) { // Test simple query. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with field of type 'Timestamp'. SqlFieldsQuery qry("select recordCreated from QueryPerson where recordCreated<'2016-01-01 01:00:00'"); @@ -1170,7 +1238,7 @@ BOOST_AUTO_TEST_CASE(TestFieldsQueryTimestampLess) BOOST_AUTO_TEST_CASE(TestFieldsQueryTimestampMore) { // Test simple query. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with field of type 'Timestamp'. SqlFieldsQuery qry("select recordCreated from QueryPerson where recordCreated>'2016-01-01 15:30:00'"); @@ -1233,7 +1301,7 @@ BOOST_AUTO_TEST_CASE(TestFieldsQueryTimestampMore) BOOST_AUTO_TEST_CASE(TestFieldsQueryTimestampEqual) { // Test simple query. - Cache cache = GetCache(); + Cache cache = GetPersonCache(); // Test query with field of type 'Timestamp'. SqlFieldsQuery qry("select recordCreated from QueryPerson where recordCreated='2016-01-01 09:18:00'"); @@ -1279,4 +1347,37 @@ BOOST_AUTO_TEST_CASE(TestFieldsQueryTimestampEqual) CheckEmpty(cursor); } +/** + * Test fields query with several pages. + */ +BOOST_AUTO_TEST_CASE(TestFieldsQueryPagesSeveral) +{ + CheckFieldsQueryPages(32, 8, 1); +} + +/** + * Test fields query with page size 1. + */ +BOOST_AUTO_TEST_CASE(TestFieldsQueryPageSingle) +{ + CheckFieldsQueryPages(1, 100, 0); +} + +/** + * Test fields query with page size 0. + */ +BOOST_AUTO_TEST_CASE(TestFieldsQueryPageZero) +{ + try + { + CheckFieldsQueryPages(0, 100, 0); + + BOOST_FAIL("Exception expected."); + } + catch (IgniteError&) + { + // Expected. + } +} + BOOST_AUTO_TEST_SUITE_END() http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/Makefile.am b/modules/platforms/cpp/core/Makefile.am index 2b73476..bbb7720 100644 --- a/modules/platforms/cpp/core/Makefile.am +++ b/modules/platforms/cpp/core/Makefile.am @@ -59,6 +59,7 @@ libignite_la_SOURCES = \ src/impl/handle_registry.cpp \ src/impl/cache/query/query_impl.cpp \ src/impl/cache/cache_impl.cpp \ + src/impl/cache/query/query_batch.cpp \ src/impl/interop/interop_external_memory.cpp \ src/impl/interop/interop_target.cpp \ src/impl/transactions/transaction_impl.cpp \ http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core/include/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/Makefile.am b/modules/platforms/cpp/core/include/Makefile.am index f7159ae..fb84bc5 100644 --- a/modules/platforms/cpp/core/include/Makefile.am +++ b/modules/platforms/cpp/core/include/Makefile.am @@ -27,6 +27,7 @@ nobase_include_HEADERS = \ ignite/impl/cache/query/query_fields_row_impl.h \ ignite/impl/cache/query/query_impl.h \ ignite/impl/cache/cache_impl.h \ + ignite/impl/cache/query/query_batch.h \ ignite/impl/interop/interop_target.h \ ignite/impl/interop/interop_external_memory.h \ ignite/impl/handle_registry.h \ http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core/include/ignite/cache/query/query_cursor.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/query_cursor.h b/modules/platforms/cpp/core/include/ignite/cache/query/query_cursor.h index 4c46662..61c6813 100644 --- a/modules/platforms/cpp/core/include/ignite/cache/query/query_cursor.h +++ b/modules/platforms/cpp/core/include/ignite/cache/query/query_cursor.h @@ -112,7 +112,7 @@ namespace ignite impl::cache::query::QueryCursorImpl* impl0 = impl.Get(); if (impl0) - return impl0->HasNext(&err); + return impl0->HasNext(err); else { err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, @@ -160,7 +160,7 @@ namespace ignite if (impl0) { impl::Out2Operation outOp; - impl0->GetNext(outOp, &err); + impl0->GetNext(outOp, err); if (err.GetCode() == IgniteError::IGNITE_SUCCESS) { @@ -215,7 +215,7 @@ namespace ignite if (impl0) { impl::OutQueryGetAllOperation outOp(&res); - impl0->GetAll(outOp, &err); + impl0->GetAll(outOp, err); } else err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core/include/ignite/cache/query/query_fields_cursor.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/query_fields_cursor.h b/modules/platforms/cpp/core/include/ignite/cache/query/query_fields_cursor.h index 3946e1c..36e5f5c 100644 --- a/modules/platforms/cpp/core/include/ignite/cache/query/query_fields_cursor.h +++ b/modules/platforms/cpp/core/include/ignite/cache/query/query_fields_cursor.h @@ -108,7 +108,7 @@ namespace ignite impl::cache::query::QueryCursorImpl* impl0 = impl.Get(); if (impl0) - return impl0->HasNext(&err); + return impl0->HasNext(err); else { err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, @@ -153,7 +153,7 @@ namespace ignite impl::cache::query::QueryCursorImpl* impl0 = impl.Get(); if (impl0) - return impl0->GetNextRow(&err); + return impl0->GetNextRow(err); else { err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_batch.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_batch.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_batch.h new file mode 100644 index 0000000..15d6edb --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_batch.h @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_CACHE_QUERY_BATCH +#define _IGNITE_CACHE_QUERY_BATCH + +#include + +#include "ignite/ignite_error.h" +#include "ignite/impl/ignite_environment.h" +#include "ignite/impl/operations.h" + +namespace ignite +{ + namespace impl + { + namespace cache + { + namespace query + { + class QueryFieldsRowImpl; + + /** + * Query batch. + */ + class IGNITE_IMPORT_EXPORT QueryBatch + { + typedef common::concurrent::SharedPointer MemorySharedPtr; + + public: + /** + * Constructor. + * + * @param env Environment. + * @param mem Batch memory. + */ + QueryBatch(IgniteEnvironment& env, MemorySharedPtr mem) : + env(env), + mem(mem), + stream(mem.Get()), + reader(&stream), + size(reader.ReadInt32()), + pos(0) + { + // No-op. + } + + /** + * Destructor. + */ + ~QueryBatch() + { + // No-op. + } + + /** + * Check whether batch is empty. + * + * @return True if empty. + */ + bool IsEmpty() const + { + return size == 0; + } + + /** + * Get the number of the unread rows in the batch. + * + * @return Number of the unread rows in the batch. + */ + int32_t Left() const + { + return size - pos; + } + + /** + * Check whether next result exists. + * + * @param err Error. + * @return True if exists. + */ + int32_t Size() + { + return size; + } + + /** + * Get next object. + * + * @param op Operation. + */ + void GetNext(OutputOperation& op) + { + assert(Left() > 0); + + op.ProcessOutput(reader); + + ++pos; + } + + /** + * Get next row. + * + * @return Output row. + */ + QueryFieldsRowImpl* GetNextRow(); + + private: + /** Environment. */ + IgniteEnvironment& env; + + /** Memomy containing the batch. */ + MemorySharedPtr mem; + + /** Stream. */ + interop::InteropInputStream stream; + + /** Reader. */ + binary::BinaryReaderImpl reader; + + /** Result batch size. */ + int32_t size; + + /** Position in memory. */ + int32_t pos; + + IGNITE_NO_COPY_ASSIGNMENT(QueryBatch); + }; + } + } + } +} + +#endif // _IGNITE_CACHE_QUERY_BATCH http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_fields_row_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_fields_row_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_fields_row_impl.h index 233c2d4..82cebd5 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_fields_row_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_fields_row_impl.h @@ -18,16 +18,9 @@ #ifndef _IGNITE_IMPL_CACHE_QUERY_CACHE_QUERY_FIELDS_ROW_IMPL #define _IGNITE_IMPL_CACHE_QUERY_CACHE_QUERY_FIELDS_ROW_IMPL -#include -#include - #include #include -#include "ignite/cache/cache_entry.h" -#include "ignite/impl/cache/query/query_impl.h" -#include "ignite/impl/operations.h" - namespace ignite { namespace impl @@ -45,23 +38,18 @@ namespace ignite typedef common::concurrent::SharedPointer SP_InteropMemory; /** - * Default constructor. - */ - QueryFieldsRowImpl() : mem(0), stream(0), reader(0), size(0), - processed(0) - { - // No-op. - } - - /** * Constructor. * * @param mem Memory containig row data. */ - QueryFieldsRowImpl(SP_InteropMemory mem) : mem(mem), stream(mem.Get()), - reader(&stream), size(reader.ReadInt32()), processed(0) + QueryFieldsRowImpl(SP_InteropMemory mem, int32_t rowBegin, int32_t columnNum) : + mem(mem), + stream(mem.Get()), + reader(&stream), + columnNum(columnNum), + processed(0) { - // No-op. + stream.Position(rowBegin); } /** @@ -89,7 +77,7 @@ namespace ignite bool HasNext(IgniteError& err) { if (IsValid()) - return processed < size; + return processed < columnNum; else { err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, @@ -165,7 +153,7 @@ namespace ignite binary::BinaryReaderImpl reader; /** Number of elements in a row. */ - int32_t size; + int32_t columnNum; /** Number of elements that have been read by now. */ int32_t processed; http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_impl.h index 0f17c32..4083c7c 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/query_impl.h @@ -22,6 +22,7 @@ #include "ignite/impl/ignite_environment.h" #include "ignite/impl/operations.h" +#include "ignite/impl/cache/query/query_batch.h" namespace ignite { @@ -58,7 +59,7 @@ namespace ignite * @param err Error. * @return True if exists. */ - bool HasNext(IgniteError* err); + bool HasNext(IgniteError& err); /** * Get next object. @@ -66,7 +67,7 @@ namespace ignite * @param op Operation. * @param err Error. */ - void GetNext(OutputOperation& op, IgniteError* err); + void GetNext(OutputOperation& op, IgniteError& err); /** * Get next row. @@ -74,7 +75,7 @@ namespace ignite * @param err Error. * @return Output row. */ - QueryFieldsRowImpl* GetNextRow(IgniteError* err); + QueryFieldsRowImpl* GetNextRow(IgniteError& err); /** * Get all cursor entries. @@ -82,7 +83,7 @@ namespace ignite * @param op Operation. * @param err Error. */ - void GetAll(OutputOperation& op, IgniteError* err); + void GetAll(OutputOperation& op, IgniteError& err); private: /** Environment. */ @@ -91,15 +92,18 @@ namespace ignite /** Handle to Java object. */ jobject javaRef; + /** Current result batch. */ + QueryBatch* batch; + + /** Whether cursor has no more elements available. */ + bool endReached; + /** Whether iteration methods were called. */ bool iterCalled; /** Whether GetAll() method was called. */ bool getAllCalled; - /** Whether next entry is available. */ - bool hasNext; - IGNITE_NO_COPY_ASSIGNMENT(QueryCursorImpl); /** @@ -108,7 +112,15 @@ namespace ignite * @param err Error. * @return True in case of success, false if an error is thrown. */ - bool CreateIteratorIfNeeded(IgniteError* err); + bool CreateIteratorIfNeeded(IgniteError& err); + + /** + * Get next result batch if update is needed. + * + * @param err Error. + * @return True if operation has been successful. + */ + bool GetNextBatchIfNeeded(IgniteError& err); /** * Check whether Java-side iterator has next element. @@ -116,7 +128,7 @@ namespace ignite * @param err Error. * @return True if the next element is available. */ - bool IteratorHasNext(IgniteError* err); + bool IteratorHasNext(IgniteError& err); }; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core/project/vs/core.vcxproj ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj index 0797c31..ca14a1d 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj @@ -209,6 +209,7 @@ + @@ -229,6 +230,7 @@ + http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core/project/vs/core.vcxproj.filters ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters index c90b697..c5fb532 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters @@ -43,6 +43,9 @@ Code\impl\interop + + Code\impl\cache\query + @@ -138,6 +141,9 @@ Code\impl\interop + + Code\impl\cache\query + http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core/src/impl/cache/query/query_batch.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cache/query/query_batch.cpp b/modules/platforms/cpp/core/src/impl/cache/query/query_batch.cpp new file mode 100644 index 0000000..44086af --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/cache/query/query_batch.cpp @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/cache/query/query_batch.h" +#include "ignite/impl/cache/query/query_fields_row_impl.h" + +namespace ignite +{ + namespace impl + { + namespace cache + { + namespace query + { + QueryFieldsRowImpl* QueryBatch::GetNextRow() + { + assert(Left() > 0); + + int32_t rowBegin = stream.Position(); + + int32_t rowLen = reader.ReadInt32(); + int32_t columnNum = reader.ReadInt32(); + + int32_t dataPos = stream.Position(); + + assert(rowLen >= 4); + + ++pos; + + stream.Position(rowBegin + rowLen); + + return new QueryFieldsRowImpl(mem, dataPos, columnNum); + } + + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/cpp/core/src/impl/cache/query/query_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cache/query/query_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/query/query_impl.cpp index 880e8b1..73d9924 100644 --- a/modules/platforms/cpp/core/src/impl/cache/query/query_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/cache/query/query_impl.cpp @@ -35,30 +35,41 @@ namespace ignite /** Operation: get all entries. */ const int32_t OP_GET_ALL = 1; + /** Operation: get multiple entries. */ + const int32_t OP_GET_BATCH = 2; + /** Operation: get single entry. */ const int32_t OP_GET_SINGLE = 3; QueryCursorImpl::QueryCursorImpl(SharedPointer env, jobject javaRef) : - env(env), javaRef(javaRef), iterCalled(false), getAllCalled(false), hasNext(false) + env(env), + javaRef(javaRef), + batch(0), + endReached(false), + iterCalled(false), + getAllCalled(false) { // No-op. } QueryCursorImpl::~QueryCursorImpl() { - // 1. Close the cursor. + // 1. Releasing memory. + delete batch; + + // 2. Close the cursor. env.Get()->Context()->QueryCursorClose(javaRef); - // 2. Release Java reference. + // 3. Release Java reference. JniContext::Release(javaRef); } - bool QueryCursorImpl::HasNext(IgniteError* err) + bool QueryCursorImpl::HasNext(IgniteError& err) { // Check whether GetAll() was called earlier. if (getAllCalled) { - *err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Cannot use HasNext() method because GetAll() was called."); return false; @@ -67,16 +78,21 @@ namespace ignite // Create iterator in Java if needed. if (!CreateIteratorIfNeeded(err)) return false; - - return hasNext; + + // Get next results batch if the end in the current batch + // has been reached. + if (!GetNextBatchIfNeeded(err)) + return false; + + return !endReached; } - void QueryCursorImpl::GetNext(OutputOperation& op, IgniteError* err) + void QueryCursorImpl::GetNext(OutputOperation& op, IgniteError& err) { // Check whether GetAll() was called earlier. if (getAllCalled) { - *err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Cannot use GetNext() method because GetAll() was called."); return; @@ -86,75 +102,52 @@ namespace ignite if (!CreateIteratorIfNeeded(err)) return; - if (hasNext) - { - JniErrorInfo jniErr; - - SharedPointer inMem = env.Get()->AllocateMemory(); - - env.Get()->Context()->TargetOutStream( - javaRef, OP_GET_SINGLE, inMem.Get()->PointerLong(), &jniErr); - - IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); - - if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) - { - InteropInputStream in(inMem.Get()); - - binary::BinaryReaderImpl reader(&in); - - op.ProcessOutput(reader); + // Get next results batch if the end in the current batch + // has been reached. + if (!GetNextBatchIfNeeded(err)) + return; - hasNext = IteratorHasNext(err); - } - } - else + if (endReached) { // Ensure we do not overwrite possible previous error. - if (err->GetCode() == IgniteError::IGNITE_SUCCESS) - *err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, "No more elements available."); + if (err.GetCode() == IgniteError::IGNITE_SUCCESS) + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, "No more elements available."); + + return; } + + batch->GetNext(op); } - QueryFieldsRowImpl* QueryCursorImpl::GetNextRow(IgniteError* err) + QueryFieldsRowImpl* QueryCursorImpl::GetNextRow(IgniteError& err) { // Create iterator in Java if needed. if (!CreateIteratorIfNeeded(err)) - return NULL; - - if (hasNext) - { - JniErrorInfo jniErr; - - SharedPointer inMem = env.Get()->AllocateMemory(); + return 0; - env.Get()->Context()->TargetOutStream(javaRef, OP_GET_SINGLE, inMem.Get()->PointerLong(), &jniErr); + // Get next results batch if the end in the current batch + // has been reached. + if (!GetNextBatchIfNeeded(err)) + return 0; - IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); - - if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) - { - hasNext = IteratorHasNext(err); - - return new QueryFieldsRowImpl(inMem); - } - } - else + if (endReached) { // Ensure we do not overwrite possible previous error. - if (err->GetCode() == IgniteError::IGNITE_SUCCESS) - *err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, "No more elements available."); + if (err.GetCode() == IgniteError::IGNITE_SUCCESS) + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, "No more elements available."); + + return 0; } - return NULL; + return batch->GetNextRow(); } - void QueryCursorImpl::GetAll(OutputOperation& op, IgniteError* err) + void QueryCursorImpl::GetAll(OutputOperation& op, IgniteError& err) { // Check whether any of iterator methods were called. if (iterCalled) { - *err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Cannot use GetAll() method because an iteration method was called."); return; @@ -163,7 +156,7 @@ namespace ignite // Check whether GetAll was called before. if (getAllCalled) { - *err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Cannot use GetNext() method because GetAll() was called."); return; @@ -176,7 +169,7 @@ namespace ignite env.Get()->Context()->TargetOutStream(javaRef, OP_GET_ALL, inMem.Get()->PointerLong(), &jniErr); - IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, &err); if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) { @@ -190,38 +183,71 @@ namespace ignite } } - bool QueryCursorImpl::CreateIteratorIfNeeded(IgniteError* err) + bool QueryCursorImpl::CreateIteratorIfNeeded(IgniteError& err) { - if (!iterCalled) - { - JniErrorInfo jniErr; + if (iterCalled) + return true; - env.Get()->Context()->QueryCursorIterator(javaRef, &jniErr); + JniErrorInfo jniErr; - IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + env.Get()->Context()->QueryCursorIterator(javaRef, &jniErr); - if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) - { - iterCalled = true; + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, &err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) + iterCalled = true; + + return iterCalled; + } + + bool QueryCursorImpl::GetNextBatchIfNeeded(IgniteError& err) + { + assert(iterCalled); + + if (endReached || (batch && batch->Left() > 0)) + return true; + + endReached = !IteratorHasNext(err); + + if (endReached) + return true; + + JniErrorInfo jniErr; + + SharedPointer inMem = env.Get()->AllocateMemory(); + + env.Get()->Context()->TargetOutStream( + javaRef, OP_GET_BATCH, inMem.Get()->PointerLong(), &jniErr); + + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, &err); + + if (jniErr.code != IGNITE_JNI_ERR_SUCCESS) + return false; + + delete batch; + + // Needed for exception safety. + batch = 0; + + batch = new QueryBatch(*env.Get(), inMem); + + endReached = batch->IsEmpty(); - hasNext = IteratorHasNext(err); - } - else - return false; - } - return true; } - bool QueryCursorImpl::IteratorHasNext(IgniteError* err) + bool QueryCursorImpl::IteratorHasNext(IgniteError& err) { JniErrorInfo jniErr; bool res = env.Get()->Context()->QueryCursorIteratorHasNext(javaRef, &jniErr); - IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, &err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) + return res; - return jniErr.code == IGNITE_JNI_ERR_SUCCESS && res; + return false; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e9c797fd/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs index d33fdce..d928418 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs @@ -52,6 +52,9 @@ namespace Apache.Ignite.Core.Impl.Cache.Query [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")] protected override T Read(BinaryReader reader) { + // Reading and skipping row size in bytes. + reader.ReadInt(); + int cnt = reader.ReadInt(); return _readerFunc(reader, cnt);