Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E83B5200CA6 for ; Mon, 29 May 2017 16:32:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E72A4160BD9; Mon, 29 May 2017 14:32:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4F9E8160BE1 for ; Mon, 29 May 2017 16:32:11 +0200 (CEST) Received: (qmail 63068 invoked by uid 500); 29 May 2017 14:32:10 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 62952 invoked by uid 99); 29 May 2017 14:32:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 May 2017 14:32:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3AA68E1892; Mon, 29 May 2017 14:32:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 29 May 2017 14:32:16 -0000 Message-Id: <97277bf57b7e457bae267cfdcd220b75@git.apache.org> In-Reply-To: <1b37b15ab6864d2abdc2a6aad8572d52@git.apache.org> References: <1b37b15ab6864d2abdc2a6aad8572d52@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/22] ignite git commit: IGNITE-5311: Added ability to get CacheObject value without CacheObjectContext. This closes #2019. archived-at: Mon, 29 May 2017 14:32:13 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index a31263f..8d9d953 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.cache.CacheException; @@ -100,7 +99,6 @@ import org.jsr166.ConcurrentHashMap8; import static java.util.Collections.singletonList; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; -import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS; import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE; @@ -132,7 +130,7 @@ public class GridReduceQueryExecutor { private final AtomicLong qryIdGen; /** */ - private final ConcurrentMap runs = new ConcurrentHashMap8<>(); + private final ConcurrentMap runs = new ConcurrentHashMap8<>(); /** */ private volatile List fakeTbls = Collections.emptyList(); @@ -191,8 +189,8 @@ public class GridReduceQueryExecutor { @Override public void onEvent(final Event evt) { UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); - for (QueryRun r : runs.values()) { - for (GridMergeIndex idx : r.idxs) { + for (ReduceQueryRun r : runs.values()) { + for (GridMergeIndex idx : r.indexes()) { if (idx.hasSource(nodeId)) { handleNodeLeft(r, nodeId); @@ -208,7 +206,7 @@ public class GridReduceQueryExecutor { * @param r Query run. * @param nodeId Left node ID. */ - private void handleNodeLeft(QueryRun r, UUID nodeId) { + private void handleNodeLeft(ReduceQueryRun r, UUID nodeId) { // Will attempt to retry. If reduce query was started it will fail on next page fetching. retry(r, h2.readyTopologyVersion(), nodeId); } @@ -248,7 +246,7 @@ public class GridReduceQueryExecutor { * @param msg Message. */ private void onFail(ClusterNode node, GridQueryFailResponse msg) { - QueryRun r = runs.get(msg.queryRequestId()); + ReduceQueryRun r = runs.get(msg.queryRequestId()); fail(r, node.id(), msg.error(), msg.failCode()); } @@ -258,7 +256,7 @@ public class GridReduceQueryExecutor { * @param nodeId Failed node ID. * @param msg Error message. */ - private void fail(QueryRun r, UUID nodeId, String msg, byte failCode) { + private void fail(ReduceQueryRun r, UUID nodeId, String msg, byte failCode) { if (r != null) { CacheException e = new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg); @@ -278,21 +276,21 @@ public class GridReduceQueryExecutor { final int qry = msg.query(); final int seg = msg.segmentId(); - final QueryRun r = runs.get(qryReqId); + final ReduceQueryRun r = runs.get(qryReqId); if (r == null) // Already finished with error or canceled. return; - final int pageSize = r.pageSize; + final int pageSize = r.pageSize(); - GridMergeIndex idx = r.idxs.get(msg.query()); + GridMergeIndex idx = r.indexes().get(msg.query()); GridResultPage page; try { page = new GridResultPage(ctx, node.id(), msg) { @Override public void fetchNextPage() { - Object errState = r.state.get(); + Object errState = r.state(); if (errState != null) { CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null; @@ -335,7 +333,7 @@ public class GridReduceQueryExecutor { if (msg.retry() != null) retry(r, msg.retry(), node.id()); else if (msg.page() == 0) // Do count down on each first page received. - r.latch.countDown(); + r.latch().countDown(); } /** @@ -343,7 +341,7 @@ public class GridReduceQueryExecutor { * @param retryVer Retry version. * @param nodeId Node ID. */ - private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) { + private void retry(ReduceQueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) { r.state(retryVer, nodeId); } @@ -501,7 +499,7 @@ public class GridReduceQueryExecutor { } /** - * @param cctx Cache context. + * @param schemaName Schema name. * @param qry Query. * @param keepPortable Keep portable. * @param enforceJoinOrder Enforce join order of tables. @@ -512,7 +510,7 @@ public class GridReduceQueryExecutor { * @return Rows iterator. */ public Iterator> query( - GridCacheContext cctx, + String schemaName, GridCacheTwoStepQuery qry, boolean keepPortable, boolean enforceJoinOrder, @@ -541,10 +539,8 @@ public class GridReduceQueryExecutor { final long qryReqId = qryIdGen.incrementAndGet(); - final String cacheName = cctx.name(); - - final QueryRun r = new QueryRun(qryReqId, qry.originalSql(), cacheName, - h2.connectionForCache(cacheName), qry.mapQueries().size(), qry.pageSize(), + final ReduceQueryRun r = new ReduceQueryRun(qryReqId, qry.originalSql(), schemaName, + h2.connectionForSchema(schemaName), qry.mapQueries().size(), qry.pageSize(), U.currentTimeMillis(), cancel); AffinityTopologyVersion topVer = h2.readyTopologyVersion(); @@ -633,7 +629,7 @@ public class GridReduceQueryExecutor { GridMergeTable tbl; try { - tbl = createMergeTable(r.conn, mapQry, qry.explain()); + tbl = createMergeTable(r.connection(), mapQry, qry.explain()); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -641,7 +637,7 @@ public class GridReduceQueryExecutor { idx = tbl.getMergeIndex(); - fakeTable(r.conn, tblIdx++).innerTable(tbl); + fakeTable(r.connection(), tblIdx++).innerTable(tbl); } else idx = GridMergeIndexUnsorted.createDummy(ctx); @@ -659,13 +655,13 @@ public class GridReduceQueryExecutor { else idx.setSources(nodes, segmentsPerIndex); - idx.setPageSize(r.pageSize); + idx.setPageSize(r.pageSize()); - r.idxs.add(idx); + r.indexes().add(idx); } - r.latch = new CountDownLatch(isReplicatedOnly ? 1 : - (r.idxs.size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt); + r.latch(new CountDownLatch(isReplicatedOnly ? 1 : + (r.indexes().size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt)); runs.put(qryReqId, r); @@ -719,7 +715,7 @@ public class GridReduceQueryExecutor { new GridH2QueryRequest() .requestId(qryReqId) .topologyVersion(topVer) - .pageSize(r.pageSize) + .pageSize(r.pageSize()) .caches(qry.cacheIds()) .tables(distributedJoins ? qry.tables() : null) .partitions(convert(partsMap)) @@ -731,7 +727,7 @@ public class GridReduceQueryExecutor { false)) { awaitAllReplies(r, nodes, cancel); - Object state = r.state.get(); + Object state = r.state(); if (state != null) { if (state instanceof CacheException) { @@ -764,7 +760,7 @@ public class GridReduceQueryExecutor { List> res = new ArrayList<>(); // Simple UNION ALL can have multiple indexes. - for (GridMergeIndex idx : r.idxs) { + for (GridMergeIndex idx : r.indexes()) { Cursor cur = idx.findInStream(null, null); while (cur.next()) { @@ -788,21 +784,19 @@ public class GridReduceQueryExecutor { UUID locNodeId = ctx.localNodeId(); - H2Utils.setupConnection(r.conn, false, enforceJoinOrder); + H2Utils.setupConnection(r.connection(), false, enforceJoinOrder); GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE) - .pageSize(r.pageSize).distributedJoinMode(OFF)); + .pageSize(r.pageSize()).distributedJoinMode(OFF)); try { - String schema = h2.schema(cacheName); - if (qry.explain()) - return explainPlan(r.conn, schema, qry, params); + return explainPlan(r.connection(), schemaName, qry, params); GridCacheSqlQuery rdc = qry.reduceQuery(); - ResultSet res = h2.executeSqlQueryWithTimer(schema, - r.conn, + ResultSet res = h2.executeSqlQueryWithTimer(schemaName, + r.connection(), rdc.query(), F.asList(rdc.parameters(params)), false, // The statement will cache some extra thread local objects. @@ -824,10 +818,10 @@ public class GridReduceQueryExecutor { continue; } - return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable); + return new GridQueryCacheObjectsIterator(resIter, h2.valueContext(), keepPortable); } catch (IgniteCheckedException | RuntimeException e) { - U.closeQuiet(r.conn); + U.closeQuiet(r.connection()); if (e instanceof CacheException) { if (wasCancelled((CacheException)e)) @@ -898,7 +892,7 @@ public class GridReduceQueryExecutor { * @param distributedJoins Distributed join flag. */ private void cancelRemoteQueriesIfNeeded(Collection nodes, - QueryRun r, + ReduceQueryRun r, long qryReqId, boolean distributedJoins) { @@ -906,7 +900,7 @@ public class GridReduceQueryExecutor { if (distributedJoins) send(nodes, new GridQueryCancelRequest(qryReqId), null, false); else { - for (GridMergeIndex idx : r.idxs) { + for (GridMergeIndex idx : r.indexes()) { if (!idx.fetchedAll()) { send(nodes, new GridQueryCancelRequest(qryReqId), null, false); @@ -922,9 +916,9 @@ public class GridReduceQueryExecutor { * @param cancel Query cancel. * @throws IgniteInterruptedCheckedException If interrupted. */ - private void awaitAllReplies(QueryRun r, Collection nodes, GridQueryCancel cancel) + private void awaitAllReplies(ReduceQueryRun r, Collection nodes, GridQueryCancel cancel) throws IgniteInterruptedCheckedException, QueryCancelledException { - while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) { + while (!U.await(r.latch(), 500, TimeUnit.MILLISECONDS)) { cancel.checkCancelled(); @@ -932,7 +926,7 @@ public class GridReduceQueryExecutor { if (!ctx.discovery().alive(node)) { handleNodeLeft(r, node.id()); - assert r.latch.getCount() == 0; + assert r.latch().getCount() == 0; return; } @@ -1420,7 +1414,7 @@ public class GridReduceQueryExecutor { CacheException err = new CacheException("Query was cancelled, client node disconnected.", new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.")); - for (Map.Entry e : runs.entrySet()) + for (Map.Entry e : runs.entrySet()) e.getValue().disconnected(err); } @@ -1435,9 +1429,9 @@ public class GridReduceQueryExecutor { long curTime = U.currentTimeMillis(); - for (QueryRun run : runs.values()) { - if (run.qry.longQuery(curTime, duration)) - res.add(run.qry); + for (ReduceQueryRun run : runs.values()) { + if (run.queryInfo().longQuery(curTime, duration)) + res.add(run.queryInfo()); } return res; @@ -1450,77 +1444,10 @@ public class GridReduceQueryExecutor { */ public void cancelQueries(Collection queries) { for (Long qryId : queries) { - QueryRun run = runs.get(qryId); + ReduceQueryRun run = runs.get(qryId); if (run != null) - run.qry.cancel(); - } - } - - /** - * Query run. - */ - private static class QueryRun { - /** */ - private final GridRunningQueryInfo qry; - - /** */ - private final List idxs; - - /** */ - private CountDownLatch latch; - - /** */ - private final JdbcConnection conn; - - /** */ - private final int pageSize; - - /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */ - private final AtomicReference state = new AtomicReference<>(); - - /** - * @param id Query ID. - * @param qry Query text. - * @param cache Cache where query was executed. - * @param conn Connection. - * @param idxsCnt Number of indexes. - * @param pageSize Page size. - * @param startTime Start time. - * @param cancel Query cancel handler. - */ - private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) { - this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, cache, startTime, cancel, false); - this.conn = (JdbcConnection)conn; - this.idxs = new ArrayList<>(idxsCnt); - this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE; - } - - /** - * @param o Fail state object. - * @param nodeId Node ID. - */ - void state(Object o, @Nullable UUID nodeId) { - assert o != null; - assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass(); - - if (!state.compareAndSet(null, o)) - return; - - while (latch.getCount() != 0) // We don't need to wait for all nodes to reply. - latch.countDown(); - - CacheException e = o instanceof CacheException ? (CacheException) o : null; - - for (GridMergeIndex idx : idxs) // Fail all merge indexes. - idx.fail(nodeId, e); - } - - /** - * @param e Error. - */ - void disconnected(CacheException e) { - state(e, null); + run.queryInfo().cancel(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java new file mode 100644 index 0000000..73bb002 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; +import org.apache.ignite.internal.processors.query.GridQueryCancel; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; +import org.h2.jdbc.JdbcConnection; +import org.jetbrains.annotations.Nullable; + +import javax.cache.CacheException; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; + +/** + * Query run. + */ +class ReduceQueryRun { + /** */ + private final GridRunningQueryInfo qry; + + /** */ + private final List idxs; + + /** */ + private CountDownLatch latch; + + /** */ + private final JdbcConnection conn; + + /** */ + private final int pageSize; + + /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */ + private final AtomicReference state = new AtomicReference<>(); + + /** + * Constructor. + * + * @param id Query ID. + * @param qry Query text. + * @param schemaName Schema name. + * @param conn Connection. + * @param idxsCnt Number of indexes. + * @param pageSize Page size. + * @param startTime Start time. + * @param cancel Query cancel handler. + */ + ReduceQueryRun(Long id, String qry, String schemaName, Connection conn, int idxsCnt, int pageSize, long startTime, + GridQueryCancel cancel) { + this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, schemaName, startTime, cancel, false); + + this.conn = (JdbcConnection)conn; + + this.idxs = new ArrayList<>(idxsCnt); + + this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE; + } + + /** + * @param o Fail state object. + * @param nodeId Node ID. + */ + void state(Object o, @Nullable UUID nodeId) { + assert o != null; + assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass(); + + if (!state.compareAndSet(null, o)) + return; + + while (latch.getCount() != 0) // We don't need to wait for all nodes to reply. + latch.countDown(); + + CacheException e = o instanceof CacheException ? (CacheException) o : null; + + for (GridMergeIndex idx : idxs) // Fail all merge indexes. + idx.fail(nodeId, e); + } + + /** + * @param e Error. + */ + void disconnected(CacheException e) { + state(e, null); + } + + /** + * @return Query info. + */ + GridRunningQueryInfo queryInfo() { + return qry; + } + + /** + * @return Page size. + */ + int pageSize() { + return pageSize; + } + + /** + * @return Connection. + */ + JdbcConnection connection() { + return conn; + } + + /** + * @return State. + */ + Object state() { + return state.get(); + } + + /** + * @return Indexes. + */ + List indexes() { + return idxs; + } + + /** + * @return Latch. + */ + CountDownLatch latch() { + return latch; + } + + /** + * @param latch Latch. + */ + void latch(CountDownLatch latch) { + this.latch = latch; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java index 5ac02a5..1f73dcb 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; @@ -726,7 +727,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract } /** {@inheritDoc} */ - @Nullable @Override public T value(CacheObjectContext ctx, boolean cpy) { + @Nullable @Override public T value(CacheObjectValueContext ctx, boolean cpy) { return (T)val; } http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java index 5939b59..b66a343 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java @@ -870,7 +870,9 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { IgniteH2Indexing idx = U.field(qryProcessor, "idx"); - return (JdbcConnection)idx.connectionForCache(DEFAULT_CACHE_NAME); + String schemaName = idx.schema(DEFAULT_CACHE_NAME); + + return (JdbcConnection)idx.connectionForSchema(schemaName); } /**