Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9991D200D37 for ; Wed, 25 Oct 2017 14:16:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 97D2B1609E5; Wed, 25 Oct 2017 12:16:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6CC82160BDA for ; Wed, 25 Oct 2017 14:16:05 +0200 (CEST) Received: (qmail 21853 invoked by uid 500); 25 Oct 2017 12:16:04 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 21822 invoked by uid 99); 25 Oct 2017 12:16:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Oct 2017 12:16:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 687F4DFC64; Wed, 25 Oct 2017 12:16:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 25 Oct 2017 12:16:05 -0000 Message-Id: <00b135dcb9f444968a08e812e4cc66d7@git.apache.org> In-Reply-To: <3e65ed4d31184afb94ceb366dccb7764@git.apache.org> References: <3e65ed4d31184afb94ceb366dccb7764@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] ignite git commit: ignite-3478 Mvcc support for sql indexes archived-at: Wed, 25 Oct 2017 12:16:07 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java index 85dcf50..8954de0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java @@ -17,20 +17,12 @@ package org.apache.ignite.internal.processors.query.h2.database.io; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.pagemem.PageUtils; -import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; -import org.apache.ignite.internal.processors.query.h2.database.H2Tree; -import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; -import org.h2.result.SearchRow; /** * Leaf page for H2 row references. */ -public class H2LeafIO extends BPlusLeafIO implements H2RowLinkIO { +public class H2LeafIO extends AbstractH2LeafIO { /** */ public static final IOVersions VERSIONS = new IOVersions<>( new H2LeafIO(1) @@ -39,36 +31,7 @@ public class H2LeafIO extends BPlusLeafIO implements H2RowLinkIO { /** * @param ver Page format version. */ - protected H2LeafIO(int ver) { + private H2LeafIO(int ver) { super(T_H2_REF_LEAF, ver, 8); } - - /** {@inheritDoc} */ - @Override public void storeByOffset(long pageAddr, int off, SearchRow row) { - GridH2Row row0 = (GridH2Row)row; - - assert row0.link() != 0; - - PageUtils.putLong(pageAddr, off, row0.link()); - } - - /** {@inheritDoc} */ - @Override public void store(long dstPageAddr, int dstIdx, BPlusIO srcIo, long srcPageAddr, int srcIdx) { - assert srcIo == this; - - PageUtils.putLong(dstPageAddr, offset(dstIdx), getLink(srcPageAddr, srcIdx)); - } - - /** {@inheritDoc} */ - @Override public SearchRow getLookupRow(BPlusTree tree, long pageAddr, int idx) - throws IgniteCheckedException { - long link = getLink(pageAddr, idx); - - return ((H2Tree)tree).getRowFactory().getRow(link); - } - - /** {@inheritDoc} */ - @Override public long getLink(long pageAddr, int idx) { - return PageUtils.getLong(pageAddr, offset(idx)); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java new file mode 100644 index 0000000..fa6978e --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import org.apache.ignite.internal.pagemem.PageUtils; + +/** + * + */ +class H2MvccExtrasInnerIO extends AbstractH2ExtrasInnerIO { + /** */ + private final int crdVerOff; + + /** */ + private final int cntrOff; + + /** */ + private final int newCrdVerOff; + + /** */ + private final int newCntrOff; + + /** + * @param type Page type. + * @param ver Page format version. + * @param payloadSize Payload size. + */ + H2MvccExtrasInnerIO(short type, int ver, int payloadSize) { + super(type, ver, 40, payloadSize); + + crdVerOff = payloadSize + 8; + cntrOff = payloadSize + 16; + newCrdVerOff = payloadSize + 24; + newCntrOff = payloadSize + 32; + } + + /** {@inheritDoc} */ + @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + crdVerOff); + } + + /** {@inheritDoc} */ + @Override public long getMvccCounter(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + cntrOff); + } + + /** {@inheritDoc} */ + @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + newCrdVerOff); + } + + /** {@inheritDoc} */ + @Override public long getNewMvccCounter(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + newCntrOff); + } + + /** {@inheritDoc} */ + @Override public boolean storeMvccInfo() { + return true; + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java new file mode 100644 index 0000000..2448e76 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import org.apache.ignite.internal.pagemem.PageUtils; + +/** + * Leaf page for H2 row references. + */ +class H2MvccExtrasLeafIO extends AbstractH2ExtrasLeafIO { + /** */ + private final int crdVerOff; + + /** */ + private final int cntrOff; + + /** */ + private final int newCrdVerOff; + + /** */ + private final int newCntrOff; + + /** + * @param type Page type. + * @param ver Page format version. + * @param payloadSize Payload size. + */ + H2MvccExtrasLeafIO(short type, int ver, int payloadSize) { + super(type, ver, 40, payloadSize); + + crdVerOff = payloadSize + 8; + cntrOff = payloadSize + 16; + newCrdVerOff = payloadSize + 24; + newCntrOff = payloadSize + 32; + } + + /** {@inheritDoc} */ + @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + crdVerOff); + } + + /** {@inheritDoc} */ + @Override public long getMvccCounter(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + cntrOff); + } + + /** {@inheritDoc} */ + @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + newCrdVerOff); + } + + /** {@inheritDoc} */ + @Override public long getNewMvccCounter(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + newCntrOff); + } + + /** {@inheritDoc} */ + @Override public boolean storeMvccInfo() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java new file mode 100644 index 0000000..e64ab43 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; + +/** + * Inner page for H2 row references. + */ +public class H2MvccInnerIO extends AbstractH2InnerIO { + /** */ + public static final IOVersions VERSIONS = new IOVersions<>( + new H2MvccInnerIO(1) + ); + + /** + * @param ver Page format version. + */ + private H2MvccInnerIO(int ver) { + super(T_H2_MVCC_REF_INNER, ver, 40); + } + + /** {@inheritDoc} */ + @Override public boolean storeMvccInfo() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java new file mode 100644 index 0000000..a364432 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; + +/** + * + */ +public class H2MvccLeafIO extends AbstractH2LeafIO { + /** */ + public static final IOVersions VERSIONS = new IOVersions<>( + new H2MvccLeafIO(1) + ); + + /** + * @param ver Page format version. + */ + private H2MvccLeafIO(int ver) { + super(T_H2_MVCC_REF_LEAF, ver, 40); + } + + /** {@inheritDoc} */ + @Override public boolean storeMvccInfo() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java index ce69197..d828c44 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java @@ -27,4 +27,37 @@ public interface H2RowLinkIO { * @return Row link. */ public long getLink(long pageAddr, int idx); + + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Mvcc coordinator version. + */ + public long getMvccCoordinatorVersion(long pageAddr, int idx); + + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Mvcc counter. + */ + public long getMvccCounter(long pageAddr, int idx); + + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Mvcc coordinator version. + */ + public long getNewMvccCoordinatorVersion(long pageAddr, int idx); + + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Mvcc counter. + */ + public long getNewMvccCounter(long pageAddr, int idx); + + /** + * @return {@code True} if IO stores mvcc information. + */ + public boolean storeMvccInfo(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index 92b7d10..96b331a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; @@ -200,6 +201,12 @@ public abstract class GridH2IndexBase extends BaseIndex { public abstract GridH2Row put(GridH2Row row); /** + * @param row Row. + * @return {@code True} if replaced existing row. + */ + public abstract boolean putx(GridH2Row row); + + /** * Remove row from index. * * @param row Row. @@ -426,7 +433,7 @@ public abstract class GridH2IndexBase extends BaseIndex { // This is the first request containing all the search rows. assert !msg.bounds().isEmpty() : "empty bounds"; - src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter()); + src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter(), qctx.mvccFilter()); } else { // This is request to fetch next portion of data. @@ -1469,20 +1476,28 @@ public abstract class GridH2IndexBase extends BaseIndex { /** */ final IndexingQueryFilter filter; + /** */ + private final H2TreeMvccFilterClosure mvccFilter; + /** Iterator. */ Iterator iter = emptyIterator(); /** * @param bounds Bounds. + * @param segment Segment. * @param filter Filter. + * @param mvccFilter Mvcc filter. */ RangeSource( Iterable bounds, int segment, - IndexingQueryFilter filter + IndexingQueryFilter filter, + H2TreeMvccFilterClosure mvccFilter ) { this.segment = segment; this.filter = filter; + this.mvccFilter = mvccFilter; + boundsIter = bounds.iterator(); } @@ -1540,7 +1555,7 @@ public abstract class GridH2IndexBase extends BaseIndex { IgniteTree t = treeForRead(segment); - iter = new CursorIteratorWrapper(doFind0(t, first, true, last, filter)); + iter = new CursorIteratorWrapper(doFind0(t, first, last, filter, mvccFilter)); if (!iter.hasNext()) { // We have to return empty range here. @@ -1565,17 +1580,17 @@ public abstract class GridH2IndexBase extends BaseIndex { /** * @param t Tree. * @param first Lower bound. - * @param includeFirst Whether lower bound should be inclusive. * @param last Upper bound always inclusive. * @param filter Filter. + * @param mvccFilter Mvcc filter. * @return Iterator over rows in given range. */ protected GridCursor doFind0( IgniteTree t, @Nullable SearchRow first, - boolean includeFirst, @Nullable SearchRow last, - IndexingQueryFilter filter) { + IndexingQueryFilter filter, + H2TreeMvccFilterClosure mvccFilter) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java index e855536..62b459a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.query.h2.opt; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.util.typedef.internal.SB; @@ -56,17 +58,24 @@ public class GridH2KeyValueRowOnheap extends GridH2Row { /** */ private Value ver; + /** */ + private final MvccCoordinatorVersion newVer; + /** * Constructor. * * @param desc Row descriptor. * @param row Row. + * @param newVer Version of new mvcc value inserted for the same key. * @param keyType Key type. * @param valType Value type. * @throws IgniteCheckedException If failed. */ - public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc, CacheDataRow row, int keyType, int valType) - throws IgniteCheckedException { + public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc, + CacheDataRow row, + MvccCoordinatorVersion newVer, + int keyType, + int valType) throws IgniteCheckedException { super(row); this.desc = desc; @@ -78,6 +87,23 @@ public class GridH2KeyValueRowOnheap extends GridH2Row { if (row.version() != null) this.ver = desc.wrap(row.version(), Value.JAVA_OBJECT); + + this.newVer = newVer; + } + + /** {@inheritDoc} */ + @Override public long newMvccCoordinatorVersion() { + return newVer != null ? newVer.coordinatorVersion() : 0; + } + + /** {@inheritDoc} */ + @Override public long newMvccCounter() { + return newVer != null ? newVer.counter(): CacheCoordinatorsProcessor.MVCC_COUNTER_NA; + } + + /** {@inheritDoc} */ + @Override public boolean indexSearchRow() { + return false; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java index 5e09a86..38ad9d0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java @@ -284,6 +284,11 @@ public class GridH2MetaTable extends TableBase { throw new IllegalStateException("Index: " + idx); } } + + /** {@inheritDoc} */ + @Override public boolean indexSearchRow() { + return false; // TODO IGNITE-3478, check meta table with mvcc. + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java index fd8a613..d24dc08 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java @@ -70,7 +70,7 @@ public class GridH2PlainRowFactory extends RowFactory { /** * @param key Key. */ - public RowKey(Value key) { + RowKey(Value key) { this.key = key; } @@ -92,6 +92,11 @@ public class GridH2PlainRowFactory extends RowFactory { } /** {@inheritDoc} */ + @Override public boolean indexSearchRow() { + return true; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(RowKey.class, this); } @@ -138,6 +143,11 @@ public class GridH2PlainRowFactory extends RowFactory { } /** {@inheritDoc} */ + @Override public boolean indexSearchRow() { + return true; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(RowPair.class, this); } @@ -174,6 +184,11 @@ public class GridH2PlainRowFactory extends RowFactory { } /** {@inheritDoc} */ + @Override public boolean indexSearchRow() { + return true; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(RowSimple.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java index 91f0aef..b490179 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.indexing.IndexingQueryFilter; @@ -83,6 +84,9 @@ public class GridH2QueryContext { /** */ private GridH2CollocationModel qryCollocationMdl; + /** */ + private H2TreeMvccFilterClosure mvccFilter; + /** * @param locNodeId Local node ID. * @param nodeId The node who initiated the query. @@ -102,13 +106,34 @@ public class GridH2QueryContext { * @param segmentId Index segment ID. * @param type Query type. */ - public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) { + public GridH2QueryContext(UUID locNodeId, + UUID nodeId, + long qryId, + int segmentId, + GridH2QueryType type) { assert segmentId == 0 || type == MAP; key = new Key(locNodeId, nodeId, qryId, segmentId, type); } /** + * @return Mvcc version. + */ + @Nullable public H2TreeMvccFilterClosure mvccFilter() { + return mvccFilter; + } + + /** + * @param mvccFilter Mvcc filter. + * @return {@code this}. + */ + public GridH2QueryContext mvccFilter(H2TreeMvccFilterClosure mvccFilter) { + this.mvccFilter = mvccFilter; + + return this; + } + + /** * @return Type. */ public GridH2QueryType type() { http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java index 54e0417..785b791 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.opt; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -88,16 +89,35 @@ public abstract class GridH2Row extends GridH2SearchRowAdapter implements CacheD /** {@inheritDoc} */ @Override public long mvccCoordinatorVersion() { - throw new UnsupportedOperationException(); + return row.mvccCoordinatorVersion(); } /** {@inheritDoc} */ @Override public long mvccCounter() { - throw new UnsupportedOperationException(); + return row.mvccCounter(); } /** {@inheritDoc} */ @Override public boolean removed() { throw new UnsupportedOperationException(); } + + /** {@inheritDoc} */ + @Override public boolean indexSearchRow() { + return false; + } + + /** + * @return Part of new mvcc version. + */ + public long newMvccCoordinatorVersion() { + return 0; + } + + /** + * @return Part of new mvcc version. + */ + public long newMvccCounter() { + return CacheCoordinatorsProcessor.MVCC_COUNTER_NA; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java index 1d915e5..ad91deb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java @@ -29,6 +29,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; @@ -58,6 +59,7 @@ import org.h2.value.ValueString; import org.h2.value.ValueTime; import org.h2.value.ValueTimestamp; import org.h2.value.ValueUuid; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL; @@ -273,17 +275,21 @@ public class GridH2RowDescriptor { * Creates new row. * * @param dataRow Data row. + * @param newVer Version of new mvcc value inserted for the same key. * @return Row. * @throws IgniteCheckedException If failed. */ - public GridH2Row createRow(CacheDataRow dataRow) throws IgniteCheckedException { + public GridH2Row createRow(CacheDataRow dataRow, @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException { GridH2Row row; try { - if (dataRow.value() == null) // Only can happen for remove operation, can create simple search row. + if (dataRow.value() == null) { // Only can happen for remove operation, can create simple search row. + assert newVer == null; + row = new GridH2KeyRowOnheap(dataRow, wrap(dataRow.key(), keyType)); + } else - row = new GridH2KeyValueRowOnheap(this, dataRow, keyType, valType); + row = new GridH2KeyValueRowOnheap(this, dataRow, newVer, keyType, valType); } catch (ClassCastException e) { throw new IgniteCheckedException("Failed to convert key to SQL type. " + http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java new file mode 100644 index 0000000..4b3940c --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +import org.apache.ignite.internal.processors.query.h2.database.H2Tree; +import org.h2.result.Row; + +/** + * + */ +public interface GridH2SearchRow extends Row { + /** + * @return Mvcc coordinator version. + */ + public long mvccCoordinatorVersion(); + + /** + * @return Mvcc counter. + */ + public long mvccCounter(); + + /** + * @return {@code True} for rows used for index search (as opposed to rows stored in {@link H2Tree}. + */ + public boolean indexSearchRow(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java index 24a90b3..4fc8ee5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.h2.opt; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.h2.result.Row; import org.h2.result.SearchRow; import org.h2.store.Data; @@ -25,7 +26,7 @@ import org.h2.value.Value; /** * Dummy H2 search row adadpter. */ -public abstract class GridH2SearchRowAdapter implements Row { +public abstract class GridH2SearchRowAdapter implements GridH2SearchRow { /** {@inheritDoc} */ @Override public void setKeyAndVersion(SearchRow old) { throw new UnsupportedOperationException(); @@ -100,4 +101,14 @@ public abstract class GridH2SearchRowAdapter implements Row { @Override public Value[] getValueList() { throw new UnsupportedOperationException(); } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return 0; + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return CacheCoordinatorsProcessor.MVCC_COUNTER_NA; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index 6c353e9..ca9c1f5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -28,6 +28,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.processors.query.IgniteSQLException; @@ -399,15 +401,16 @@ public class GridH2Table extends TableBase { * otherwise value and expiration time will be updated or new row will be added. * * @param row Row. + * @param newVer Version of new mvcc value inserted for the same key. * @param rmv If {@code true} then remove, else update row. * @return {@code true} If operation succeeded. * @throws IgniteCheckedException If failed. */ - public boolean update(CacheDataRow row, boolean rmv) + public boolean update(CacheDataRow row, @Nullable MvccCoordinatorVersion newVer, boolean rmv) throws IgniteCheckedException { assert desc != null; - GridH2Row h2Row = desc.createRow(row); + GridH2Row h2Row = desc.createRow(row, newVer); if (rmv) return doUpdate(h2Row, true); @@ -454,6 +457,8 @@ public class GridH2Table extends TableBase { */ @SuppressWarnings("LockAcquiredButNotSafelyReleased") private boolean doUpdate(final GridH2Row row, boolean del) throws IgniteCheckedException { + assert !cctx.mvccEnabled() || row.mvccCounter() != CacheCoordinatorsProcessor.MVCC_COUNTER_NA : row; + // Here we assume that each key can't be updated concurrently and case when different indexes // getting updated from different threads with different rows with the same key is impossible. lock(false); @@ -466,10 +471,25 @@ public class GridH2Table extends TableBase { if (!del) { assert rowFactory == null || row.link() != 0 : row; - GridH2Row old = pk.put(row); // Put to PK. + GridH2Row old; + + // Put to PK. + if (cctx.mvccEnabled()) { + boolean replaced = pk.putx(row); + + assert replaced == (row.newMvccCoordinatorVersion() != 0); - if (old == null) - size.increment(); + old = null; + + if (!replaced) + size.increment(); + } + else { + old = pk.put(row); + + if (old == null) + size.increment(); + } int len = idxs.size(); @@ -536,17 +556,24 @@ public class GridH2Table extends TableBase { private void addToIndex(GridH2IndexBase idx, Index pk, GridH2Row row, GridH2Row old, boolean tmp) { assert !idx.getIndexType().isUnique() : "Unique indexes are not supported: " + idx; - GridH2Row old2 = idx.put(row); + if (idx.ctx.mvccEnabled()) { + boolean replaced = idx.putx(row); - if (old2 != null) { // Row was replaced in index. - if (!tmp) { - if (!eq(pk, old2, old)) - throw new IllegalStateException("Row conflict should never happen, unique indexes are " + - "not supported [idx=" + idx + ", old=" + old + ", old2=" + old2 + ']'); + assert replaced == (row.newMvccCoordinatorVersion() != 0); + } + else { + GridH2Row old2 = idx.put(row); + + if (old2 != null) { // Row was replaced in index. + if (!tmp) { + if (!eq(pk, old2, old)) + throw new IllegalStateException("Row conflict should never happen, unique indexes are " + + "not supported [idx=" + idx + ", old=" + old + ", old2=" + old2 + ']'); + } } + else if (old != null) // Row was not replaced, need to remove manually. + idx.removex(old); } - else if (old != null) // Row was not replaced, need to remove manually. - idx.removex(old); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 77b928f..fe21b1d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; @@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.UpdateResult; +import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure; import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException; @@ -482,7 +484,8 @@ public class GridMapQueryExecutor { false, // Replicated is always false here (see condition above). req.timeout(), params, - true); // Lazy = true. + true, + req.mvccVersion()); // Lazy = true. } else { ctx.closure().callLocal( @@ -504,7 +507,8 @@ public class GridMapQueryExecutor { false, req.timeout(), params, - false); // Lazy = false. + false, + req.mvccVersion()); // Lazy = false. return null; } @@ -528,7 +532,8 @@ public class GridMapQueryExecutor { replicated, req.timeout(), params, - lazy); + lazy, + req.mvccVersion()); } /** @@ -544,6 +549,7 @@ public class GridMapQueryExecutor { * @param pageSize Page size. * @param distributedJoinMode Query distributed join mode. * @param lazy Streaming flag. + * @param mvccVer Mvcc version. */ private void onQueryRequest0( final ClusterNode node, @@ -561,7 +567,8 @@ public class GridMapQueryExecutor { final boolean replicated, final int timeout, final Object[] params, - boolean lazy + boolean lazy, + @Nullable final MvccCoordinatorVersion mvccVer ) { if (lazy && MapQueryLazyWorker.currentWorker() == null) { // Lazy queries must be re-submitted to dedicated workers. @@ -570,8 +577,24 @@ public class GridMapQueryExecutor { worker.submit(new Runnable() { @Override public void run() { - onQueryRequest0(node, reqId, segmentId, schemaName, qrys, cacheIds, topVer, partsMap, parts, - pageSize, distributedJoinMode, enforceJoinOrder, replicated, timeout, params, true); + onQueryRequest0( + node, + reqId, + segmentId, + schemaName, + qrys, + cacheIds, + topVer, + partsMap, + parts, + pageSize, + distributedJoinMode, + enforceJoinOrder, + replicated, + timeout, + params, + true, + mvccVer); } }); @@ -639,6 +662,9 @@ public class GridMapQueryExecutor { .topologyVersion(topVer) .reservations(reserved); + if (mvccVer != null) + qctx.mvccFilter(new H2TreeMvccFilterClosure(mvccVer)); + Connection conn = h2.connectionForSchema(schemaName); H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder); http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java index 1c0efb3..4518d14 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java @@ -25,8 +25,10 @@ import java.util.NoSuchElementException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.h2.index.Cursor; import org.h2.result.Row; +import org.jetbrains.annotations.Nullable; /** * Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex} objects. @@ -59,6 +61,9 @@ class GridMergeIndexIterator implements Iterator>, AutoCloseable { /** Whether remote resources were released. */ private boolean released; + /** */ + private MvccQueryTracker mvccTracker; + /** * Constructor. * @@ -69,14 +74,19 @@ class GridMergeIndexIterator implements Iterator>, AutoCloseable { * @param distributedJoins Distributed joins. * @throws IgniteCheckedException if failed. */ - GridMergeIndexIterator(GridReduceQueryExecutor rdcExec, Collection nodes, ReduceQueryRun run, - long qryReqId, boolean distributedJoins) + GridMergeIndexIterator(GridReduceQueryExecutor rdcExec, + Collection nodes, + ReduceQueryRun run, + long qryReqId, + boolean distributedJoins, + @Nullable MvccQueryTracker mvccTracker) throws IgniteCheckedException { this.rdcExec = rdcExec; this.nodes = nodes; this.run = run; this.qryReqId = qryReqId; this.distributedJoins = distributedJoins; + this.mvccTracker = mvccTracker; this.idxIter = run.indexes().iterator(); @@ -155,7 +165,7 @@ class GridMergeIndexIterator implements Iterator>, AutoCloseable { private void releaseIfNeeded() { if (!released) { try { - rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins); + rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins, mvccTracker); } finally { released = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index f85cd94..80b1970 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -59,6 +59,8 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; @@ -83,11 +85,13 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryReq import org.apache.ignite.internal.util.GridIntIterator; import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.CIX2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.command.ddl.CreateTableData; @@ -562,6 +566,31 @@ public class GridReduceQueryExecutor { List cacheIds = qry.cacheIds(); + MvccQueryTracker mvccTracker = null; + + // TODO IGNITE-3478. + if (qry.mvccEnabled()) { + assert !cacheIds.isEmpty(); + + final GridFutureAdapter fut = new GridFutureAdapter<>(); + + mvccTracker = new MvccQueryTracker(cacheContext(cacheIds.get(0)), true, + new IgniteBiInClosure() { + @Override public void apply(AffinityTopologyVersion topVer, IgniteCheckedException e) { + fut.onDone(null, e); + } + }); + + mvccTracker.requestVersion(topVer); + + try { + fut.get(); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + } + Collection nodes = null; // Explicit partition mapping for unstable topology. @@ -730,6 +759,9 @@ public class GridReduceQueryExecutor { .timeout(timeoutMillis) .schemaName(schemaName); + if (mvccTracker != null) + req.mvccVersion(mvccTracker.mvccVersion()); + if (send(nodes, req, parts == null ? null : new ExplicitPartitionsSpecializer(qryMap), false)) { awaitAllReplies(r, nodes, cancel); @@ -763,7 +795,12 @@ public class GridReduceQueryExecutor { if (!retry) { if (skipMergeTbl) { - resIter = new GridMergeIndexIterator(this, finalNodes, r, qryReqId, qry.distributedJoins()); + resIter = new GridMergeIndexIterator(this, + finalNodes, + r, + qryReqId, + qry.distributedJoins(), + mvccTracker); release = false; } @@ -833,7 +870,7 @@ public class GridReduceQueryExecutor { } finally { if (release) { - releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins()); + releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins(), mvccTracker); if (!skipMergeTbl) { for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) @@ -1028,7 +1065,10 @@ public class GridReduceQueryExecutor { * @param distributedJoins Distributed join flag. */ public void releaseRemoteResources(Collection nodes, ReduceQueryRun r, long qryReqId, - boolean distributedJoins) { + boolean distributedJoins, MvccQueryTracker mvccTracker) { + if (mvccTracker != null) + mvccTracker.onQueryDone(); + // For distributedJoins need always send cancel request to cleanup resources. if (distributedJoins) send(nodes, new GridQueryCancelRequest(qryReqId), null, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java index 4e1fadb..347b88c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.QueryTable; @@ -42,6 +43,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS; @@ -133,6 +135,9 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** Schema name. */ private String schemaName; + /** */ + private MvccCoordinatorVersion mvccVer; + /** * Required by {@link Externalizable} */ @@ -157,6 +162,24 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { params = req.params; paramsBytes = req.paramsBytes; schemaName = req.schemaName; + mvccVer = req.mvccVer; + } + + /** + * @return Mvcc version. + */ + @Nullable public MvccCoordinatorVersion mvccVersion() { + return mvccVer; + } + + /** + * @param mvccVer Mvcc version. + * @return {@code this}. + */ + public GridH2QueryRequest mvccVersion(MvccCoordinatorVersion mvccVer) { + this.mvccVer = mvccVer; + + return this; } /** @@ -435,65 +458,71 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { writer.incrementState(); case 2: - if (!writer.writeInt("pageSize", pageSize)) + if (!writer.writeMessage("mvccVer", mvccVer)) return false; writer.incrementState(); case 3: - if (!writer.writeByteArray("paramsBytes", paramsBytes)) + if (!writer.writeInt("pageSize", pageSize)) return false; writer.incrementState(); case 4: - if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR)) + if (!writer.writeByteArray("paramsBytes", paramsBytes)) return false; writer.incrementState(); case 5: - if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG)) + if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR)) return false; writer.incrementState(); case 6: - if (!writer.writeLong("reqId", reqId)) + if (!writer.writeIntArray("qryParts", qryParts)) return false; writer.incrementState(); case 7: - if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 8: - if (!writer.writeInt("timeout", timeout)) + if (!writer.writeLong("reqId", reqId)) return false; writer.incrementState(); case 9: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeString("schemaName", schemaName)) return false; writer.incrementState(); - case 10: - if (!writer.writeIntArray("qryParts", qryParts)) + if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 11: - if (!writer.writeString("schemaName", schemaName)) + if (!writer.writeInt("timeout", timeout)) return false; writer.incrementState(); + + case 12: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + } return true; @@ -524,7 +553,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 2: - pageSize = reader.readInt("pageSize"); + mvccVer = reader.readMessage("mvccVer"); if (!reader.isLastRead()) return false; @@ -532,7 +561,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 3: - paramsBytes = reader.readByteArray("paramsBytes"); + pageSize = reader.readInt("pageSize"); if (!reader.isLastRead()) return false; @@ -540,7 +569,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 4: - parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false); + paramsBytes = reader.readByteArray("paramsBytes"); if (!reader.isLastRead()) return false; @@ -548,7 +577,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 5: - qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG); + parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false); if (!reader.isLastRead()) return false; @@ -556,7 +585,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 6: - reqId = reader.readLong("reqId"); + qryParts = reader.readIntArray("qryParts"); if (!reader.isLastRead()) return false; @@ -564,7 +593,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 7: - tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG); + qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -572,7 +601,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 8: - timeout = reader.readInt("timeout"); + reqId = reader.readLong("reqId"); if (!reader.isLastRead()) return false; @@ -580,16 +609,15 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 9: - topVer = reader.readMessage("topVer"); + schemaName = reader.readString("schemaName"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 10: - qryParts = reader.readIntArray("qryParts"); + tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -597,12 +625,21 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 11: - schemaName = reader.readString("schemaName"); + timeout = reader.readInt("timeout"); if (!reader.isLastRead()) return false; reader.incrementState(); + + case 12: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridH2QueryRequest.class); @@ -615,7 +652,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 12; + return 13; } /** {@inheritDoc} */