ignite-sql-tests - jdbc
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a06a5575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a06a5575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a06a5575
Branch: refs/heads/sprint-2
Commit: a06a55750c42649acba4a27697809dde463542f1
Parents: cfcb9a4
Author: S.Vladykin <svladykin@gridgain.com>
Authored: Tue Mar 17 05:23:11 2015 +0300
Committer: S.Vladykin <svladykin@gridgain.com>
Committed: Tue Mar 17 05:23:11 2015 +0300
----------------------------------------------------------------------
.../ignite/jdbc/JdbcEmptyCacheSelfTest.java | 3 +
.../processors/cache/IgniteCacheProxy.java | 4 +-
.../processors/cache/QueryCursorImpl.java | 18 +++
.../query/jdbc/GridCacheQueryJdbcTask.java | 5 +-
.../processors/query/GridQueryProcessor.java | 14 +-
.../processors/query/h2/IgniteH2Indexing.java | 143 +++++++------------
.../query/h2/sql/GridSqlQueryParser.java | 33 +++--
.../query/h2/sql/GridSqlQuerySplitter.java | 19 ++-
.../h2/twostep/GridReduceQueryExecutor.java | 47 +++---
9 files changed, 150 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java
index 9742999..3869ddd 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java
@@ -54,6 +54,9 @@ public class JdbcEmptyCacheSelfTest extends GridCommonAbstractTest {
cache.setCacheMode(PARTITIONED);
cache.setBackups(1);
cache.setWriteSynchronizationMode(FULL_SYNC);
+ cache.setIndexedTypes(
+ Byte.class, Byte.class
+ );
cfg.setCacheConfiguration(cache);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index aaa63fd..3216ccc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -489,8 +489,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
* @return Cursor.
*/
private QueryCursor<List<?>> doLocalFieldsQuery(SqlFieldsQuery q) {
- return new QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields(
- ctx.name(), q.getSql(), q.getArgs()));
+ return ctx.kernalContext().query().queryLocalFields(
+ ctx.name(), q.getSql(), q.getArgs());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index 62e7376..7cb9efc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
import java.util.*;
@@ -32,6 +33,9 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T>
{
/** */
private boolean iterTaken;
+ /** */
+ private Collection<GridQueryFieldMetadata> fieldsMeta;
+
/**
* @param iter Iterator.
*/
@@ -95,4 +99,18 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T>
{
}
}
}
+
+ /**
+ * @param fieldsMeta SQL Fields query result metadata.
+ */
+ public void fieldsMeta(Collection<GridQueryFieldMetadata> fieldsMeta) {
+ this.fieldsMeta = fieldsMeta;
+ }
+
+ /**
+ * @return SQL Fields query result metadata.
+ */
+ public Collection<GridQueryFieldMetadata> fieldsMeta() {
+ return fieldsMeta;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
index b53a9e1..332c649 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.cache.query.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.query.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -194,7 +195,9 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[],
byte[]> {
QueryCursor<List<?>> cursor = cache.queryFields(qry);
- Collection<GridQueryFieldMetadata> meta = null; // TODO
+ Collection<GridQueryFieldMetadata> meta = ((QueryCursorImpl<List<?>>)cursor).fieldsMeta();
+
+ assert meta != null;
tbls = new ArrayList<>(meta.size());
cols = new ArrayList<>(meta.size());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 11a9f2c..aa924c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.*;
import org.apache.ignite.spi.indexing.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
@@ -555,13 +554,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param args Arguments.
* @return Iterator.
*/
- public Iterator<List<?>> queryLocalFields(String space, String sql, Object[]
args) {
+ public QueryCursor<List<?>> queryLocalFields(String space, String sql, Object[]
args) {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
- IgniteSpiCloseableIterator<List<?>> iterator =
- idx.queryFields(space, sql, F.asList(args), idx.backupFilter()).iterator();
+ GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -579,10 +577,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
null));
}
- return iterator;
+ QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(res.iterator());
+
+ cursor.fieldsMeta(res.metaData());
+
+ return cursor;
}
catch (IgniteCheckedException e) {
- throw new IgniteException(e);
+ throw new CacheException(e);
}
finally {
busyLock.leaveBusy();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index c3a3da3..1e431db 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -65,7 +65,6 @@ import java.sql.*;
import java.text.*;
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
import static org.apache.ignite.IgniteSystemProperties.*;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.*;
@@ -496,18 +495,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (rs != null) {
try {
- ResultSetMetaData rsMeta = rs.getMetaData();
-
- meta = new ArrayList<>(rsMeta.getColumnCount());
-
- for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
- String schemaName = rsMeta.getSchemaName(i);
- String typeName = rsMeta.getTableName(i);
- String name = rsMeta.getColumnLabel(i);
- String type = rsMeta.getColumnClassName(i);
-
- meta.add(new SqlFieldMetadata(schemaName, typeName, name, type));
- }
+ meta = meta(rs.getMetaData());
}
catch (SQLException e) {
throw new IgniteSpiException("Failed to get meta data.", e);
@@ -522,6 +510,26 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @param rsMeta Metadata.
+ * @return List of fields metadata.
+ * @throws SQLException If failed.
+ */
+ private static List<GridQueryFieldMetadata> meta(ResultSetMetaData rsMeta) throws
SQLException {
+ ArrayList<GridQueryFieldMetadata> meta = new ArrayList<>(rsMeta.getColumnCount());
+
+ for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
+ String schemaName = rsMeta.getSchemaName(i);
+ String typeName = rsMeta.getTableName(i);
+ String name = rsMeta.getColumnLabel(i);
+ String type = rsMeta.getColumnClassName(i);
+
+ meta.add(new SqlFieldMetadata(schemaName, typeName, name, type));
+ }
+
+ return meta;
+ }
+
+ /**
* @param stmt Prepared statement.
* @return Command type.
*/
@@ -739,12 +747,38 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@Override public QueryCursor<List<?>> queryTwoStep(String space, String sqlQry,
Object[] params) {
Connection c = connectionForSpace(space);
- GridCacheTwoStepQuery twoStepQry = GridSqlQuerySplitter.split(c, sqlQry, params);
+ PreparedStatement stmt;
+
+ try {
+ stmt = c.prepareStatement(sqlQry);
+ }
+ catch (SQLException e) {
+ throw new CacheException("Failed to parse query: " + sqlQry, e);
+ }
+
+ GridCacheTwoStepQuery twoStepQry;
+ Collection<GridQueryFieldMetadata> meta;
+
+ try {
+ twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, params);
+
+ meta = meta(stmt.getMetaData());
+ }
+ catch (SQLException e) {
+ throw new CacheException(e);
+ }
+ finally {
+ U.close(stmt, log);
+ }
if (log.isDebugEnabled())
log.debug("Parsed query: `" + sqlQry + "` into two step query: " + twoStepQry);
- return queryTwoStep(space, twoStepQry);
+ QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)queryTwoStep(space,
twoStepQry);
+
+ cursor.fieldsMeta(meta);
+
+ return cursor;
}
/**
@@ -1049,7 +1083,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (Utils.serializer != null)
U.warn(log, "Custom H2 serialization is already configured, will override.");
- Utils.serializer = h2Serializer(ctx != null && ctx.deploy().enabled());
+ Utils.serializer = h2Serializer();
String dbName = (ctx != null ? ctx.localNodeId() : UUID.randomUUID()).toString();
@@ -1097,83 +1131,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * @param p2pEnabled If peer-deployment is enabled.
* @return Serializer.
*/
- protected JavaObjectSerializer h2Serializer(boolean p2pEnabled) {
- return p2pEnabled ?
- new JavaObjectSerializer() {
- /** */
- private volatile Map<ClassLoader, Byte> ldr2id = Collections.emptyMap();
-
- /** */
- private volatile Map<Byte, ClassLoader> id2ldr = Collections.emptyMap();
-
- /** */
- private byte ldrIdGen = Byte.MIN_VALUE;
-
- /** */
- private final Lock lock = new ReentrantLock();
-
- @Override public byte[] serialize(Object obj) throws Exception {
- ClassLoader ldr = obj.getClass().getClassLoader();
-
- Byte ldrId = ldr2id.get(ldr);
-
- if (ldrId == null) {
- lock.lock();
-
- try {
- ldrId = ldr2id.get(ldr);
-
- if (ldrId == null) {
- ldrId = ldrIdGen++;
-
- if (id2ldr.containsKey(ldrId)) // Overflow.
- throw new IgniteException("Failed to add new peer-to-peer
class loader.");
-
- Map<Byte, ClassLoader> id2ldr0 = new HashMap<>(id2ldr);
- Map<ClassLoader, Byte> ldr2id0 = new IdentityHashMap<>(ldr2id);
-
- id2ldr0.put(ldrId, ldr);
- ldr2id0.put(ldr, ldrId);
-
- ldr2id = ldr2id0;
- id2ldr = id2ldr0;
- }
- }
- finally {
- lock.unlock();
- }
- }
-
- byte[] bytes = marshaller.marshal(obj);
-
- int len = bytes.length;
-
- bytes = Arrays.copyOf(bytes, len + 1); // The last byte is for ldrId.
-
- bytes[len] = ldrId;
-
- return bytes;
- }
-
- @Override public Object deserialize(byte[] bytes) throws Exception {
- int last = bytes.length - 1;
-
- byte ldrId = bytes[last];
-
- ClassLoader ldr = id2ldr.get(ldrId);
-
- if (ldr == null)
- throw new IllegalStateException("Class loader was not found: " +
ldrId);
-
- bytes = Arrays.copyOf(bytes, last); // Trim the last byte.
-
- return marshaller.unmarshal(bytes, ldr);
- }
- } :
- new JavaObjectSerializer() {
+ protected JavaObjectSerializer h2Serializer() {
+ return new JavaObjectSerializer() {
@Override public byte[] serialize(Object obj) throws Exception {
return marshaller.marshal(obj);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index a8c83d6..2e2f9c3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.h2.sql;
import org.apache.ignite.*;
+import org.h2.command.*;
import org.h2.command.dml.*;
import org.h2.engine.*;
import org.h2.expression.*;
@@ -28,7 +29,6 @@ import org.h2.value.*;
import org.jetbrains.annotations.*;
import java.lang.reflect.*;
-import java.sql.*;
import java.util.*;
import java.util.Set;
@@ -151,17 +151,34 @@ public class GridSqlQueryParser {
private static final Getter<JavaFunction, FunctionAlias> FUNC_ALIAS = getter(JavaFunction.class,
"functionAlias");
/** */
+ private static final Getter<JdbcPreparedStatement,Command> COMMAND = getter(JdbcPreparedStatement.class,
"command");
+
+ /** */
+ private static volatile Getter<Command,Prepared> prepared;
+
+ /** */
private final IdentityHashMap<Object, Object> h2ObjToGridObj = new IdentityHashMap<>();
/**
- * @param conn Connection.
- * @param select Select query.
- * @return Parsed select query.
+ * @param stmt Prepared statement.
+ * @return Parsed select.
*/
- public static GridSqlSelect parse(Connection conn, String select) {
- Session ses = (Session)((JdbcConnection)conn).getSession();
+ public static GridSqlSelect parse(JdbcPreparedStatement stmt) {
+ Command cmd = COMMAND.get(stmt);
+
+ Getter<Command,Prepared> p = prepared;
+
+ if (p == null) {
+ Class<? extends Command> cls = cmd.getClass();
+
+ assert cls.getSimpleName().equals("CommandContainer");
+
+ prepared = p = getter(cls, "prepared");
+ }
+
+ Prepared select = p.get(cmd);
- return new GridSqlQueryParser().parse((Select)ses.prepare(select));
+ return new GridSqlQueryParser().parse((Select)select);
}
/**
@@ -510,7 +527,7 @@ public class GridSqlQueryParser {
* @param cls Class.
* @param fldName Fld name.
*/
- private static <T, R> Getter<T, R> getter(Class<T> cls, String fldName)
{
+ private static <T, R> Getter<T, R> getter(Class<? extends T> cls, String
fldName) {
Field field;
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 019ed59..47e5e05 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -19,9 +19,9 @@ package org.apache.ignite.internal.processors.query.h2.sql;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.cache.query.*;
+import org.h2.jdbc.*;
import org.h2.value.*;
-import java.sql.*;
import java.util.*;
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.*;
@@ -56,16 +56,15 @@ public class GridSqlQuerySplitter {
}
/**
- * @param conn Connection.
- * @param query Query.
+ * @param stmt Prepared statement.
* @param params Parameters.
* @return Two step query.
*/
- public static GridCacheTwoStepQuery split(Connection conn, String query, Object[] params)
{
+ public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, Object[] params)
{
if (params == null)
params = GridCacheSqlQuery.EMPTY_PARAMS;
- GridSqlSelect srcQry = GridSqlQueryParser.parse(conn, query);
+ GridSqlSelect srcQry = GridSqlQueryParser.parse(stmt);
final String mergeTable = TABLE_FUNC_NAME + "()"; // table(0); TODO
@@ -299,16 +298,14 @@ public class GridSqlQuerySplitter {
String mapColAlias = columnName(idx);
String rdcColAlias;
- if (alias == null) { // Wrap map column with generated alias if none.
+ if (alias == null) // Original column name for reduce column.
rdcColAlias = el instanceof GridSqlColumn ? ((GridSqlColumn)el).columnName()
: mapColAlias;
-
- alias = alias(mapColAlias, el); // `el` is known not to be alias.
-
- mapSelect.set(idx, alias);
- }
else // Set initial alias for reduce column.
rdcColAlias = alias.alias();
+ // Always wrap map column into generated alias.
+ mapSelect.set(idx, alias(mapColAlias, el)); // `el` is known not to be an alias.
+
if (idx < rdcSelect.length) { // SELECT __C0 AS orginal_alias
GridSqlElement rdcEl = column(mapColAlias);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f3d6bfc..4c1dde7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -200,24 +200,37 @@ public class GridReduceQueryExecutor implements GridMessageListener
{
GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
- idx.addPage(new GridResultPage(node.id(), msg, false) {
- @Override public void fetchNextPage() {
- if (r.rmtErr != null)
- throw new CacheException("Next page fetch failed.", r.rmtErr);
-
- try {
- GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId,
qry, pageSize);
-
- if (node.isLocal())
- h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
- else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.PUBLIC_POOL);
- }
- catch (IgniteCheckedException e) {
- throw new CacheException(e);
+ GridResultPage page;
+
+ try {
+ page = new GridResultPage(node.id(), msg, false) {
+ @Override public void fetchNextPage() {
+ if (r.rmtErr != null)
+ throw new CacheException("Next page fetch failed.", r.rmtErr);
+
+ try {
+ GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId,
qry, pageSize);
+
+ if (node.isLocal())
+ h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
+ else
+ ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.PUBLIC_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheException(e);
+ }
}
- }
- });
+ };
+ }
+ catch (Exception e) {
+ U.error(log, "Error in message.", e);
+
+ fail(r, node.id(), "Error in message.");
+
+ return;
+ }
+
+ idx.addPage(page);
if (msg.allRows() != -1) // Only the first page contains row count.
r.latch.countDown();
|