Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 31C6B200B4C for ; Fri, 22 Jul 2016 16:08:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 30573160A6D; Fri, 22 Jul 2016 14:08:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9205F160A94 for ; Fri, 22 Jul 2016 16:08:41 +0200 (CEST) Received: (qmail 55877 invoked by uid 500); 22 Jul 2016 14:08:40 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 55623 invoked by uid 99); 22 Jul 2016 14:08:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jul 2016 14:08:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 77359E0844; Fri, 22 Jul 2016 14:08:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 22 Jul 2016 14:08:47 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [08/11] ignite git commit: ignite-1232 Distributed SQL joins implementation archived-at: Fri, 22 Jul 2016 14:08:44 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java new file mode 100644 index 0000000..d8f2e08 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import org.h2.command.ddl.CreateTableData; +import org.h2.engine.Session; +import org.h2.index.BaseIndex; +import org.h2.index.Cursor; +import org.h2.index.Index; +import org.h2.index.IndexCondition; +import org.h2.index.IndexType; +import org.h2.index.SingleRowCursor; +import org.h2.message.DbException; +import org.h2.result.Row; +import org.h2.result.SearchRow; +import org.h2.result.SortOrder; +import org.h2.table.Column; +import org.h2.table.IndexColumn; +import org.h2.table.TableBase; +import org.h2.table.TableFilter; +import org.h2.value.Value; +import org.h2.value.ValueInt; +import org.jsr166.ConcurrentHashMap8; + +/** + * Meta table. + */ +public class GridH2MetaTable extends TableBase { + /** */ + private static final int ID = 0; + + /** */ + private final MetaIndex index; + + /** */ + private final AtomicLong dataModificationId = new AtomicLong(); + + /** */ + private final Set fakeExclusiveSet = Collections.newSetFromMap( + new ConcurrentHashMap8()); + + /** + * @param data Data. + */ + public GridH2MetaTable(CreateTableData data) { + super(data); + + ArrayList cols = data.columns; + assert cols.size() == 4 : cols; + + Column id = cols.get(ID); + assert "ID".equals(id.getName()) && id.getType() == Value.INT : cols; + assert id.getColumnId() == ID; + + index = new MetaIndex(); + } + + /** {@inheritDoc} */ + @Override public Row getTemplateRow() { + return new MetaRow(); + } + + /** {@inheritDoc} */ + @Override public SearchRow getTemplateSimpleRow(boolean singleColumn) { + if (singleColumn) + return GridH2RowFactory.create((Value)null); + + return new MetaRow(); + } + + /** {@inheritDoc} */ + @Override public boolean lock(Session session, boolean exclusive, boolean forceLockEvenInMvcc) { + if (fakeExclusiveSet.contains(session)) + return true; + + if (exclusive) + fakeExclusiveSet.add(session); + + return false; + } + + /** {@inheritDoc} */ + @Override public void unlock(Session s) { + fakeExclusiveSet.remove(s); + } + + /** {@inheritDoc} */ + @Override public void close(Session session) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Index addIndex(Session session, String indexName, int indexId, + IndexColumn[] cols, IndexType indexType, boolean create, String indexComment) { + assert cols.length == 1 : "len: " + cols.length; + + int colId = cols[0].column.getColumnId(); + assert colId == ID : "colId: " + colId; + + return index; + } + + /** {@inheritDoc} */ + @Override public void removeRow(Session session, Row row) { + dataModificationId.incrementAndGet(); + index.remove(session, row); + } + + /** {@inheritDoc} */ + @Override public void truncate(Session session) { + dataModificationId.incrementAndGet(); + index.truncate(session); + } + + /** {@inheritDoc} */ + @Override public void addRow(Session session, Row row) { + dataModificationId.incrementAndGet(); + index.add(session, row); + } + + /** {@inheritDoc} */ + @Override public void checkSupportAlter() { + throw DbException.getUnsupportedException("alter"); + } + + /** {@inheritDoc} */ + @Override public String getTableType() { + return SYSTEM_TABLE; + } + + /** {@inheritDoc} */ + @Override public Index getScanIndex(Session session) { + return index; + } + + /** {@inheritDoc} */ + @Override public Index getUniqueIndex() { + return index; + } + + /** {@inheritDoc} */ + @Override public ArrayList getIndexes() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isLockedExclusively() { + return !fakeExclusiveSet.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public boolean isLockedExclusivelyBy(Session s) { + return fakeExclusiveSet.contains(s); + } + + /** {@inheritDoc} */ + @Override public long getMaxDataModificationId() { + return dataModificationId.get(); + } + + /** {@inheritDoc} */ + @Override public boolean isDeterministic() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean canGetRowCount() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean canDrop() { + return false; + } + + /** {@inheritDoc} */ + @Override public long getRowCount(Session session) { + return index.getRowCount(session); + } + + /** {@inheritDoc} */ + @Override public long getRowCountApproximation() { + return index.getRowCountApproximation(); + } + + /** {@inheritDoc} */ + @Override public long getDiskSpaceUsed() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void checkRename() { + throw DbException.getUnsupportedException("rename"); + } + + /** + * Get value row. + */ + private static class MetaRow extends GridH2Row { + /** */ + private Value v0; + + /** */ + private Value v1; + + /** */ + private Value v2; + + /** */ + private Value v3; + + /** {@inheritDoc} */ + @Override public int getColumnCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public Value getValue(int idx) { + switch (idx) { + case 0: + return v0; + + case 1: + return v1; + + case 2: + return v2; + + case 3: + return v3; + + default: + throw new IllegalStateException("Index: " + idx); + } + } + + /** {@inheritDoc} */ + @Override public void setValue(int idx, Value v) { + switch (idx) { + case 0: + v0 = v; + + break; + + case 1: + v1 = v; + + break; + + case 2: + v2 = v; + + break; + + case 3: + v3 = v; + + break; + + default: + throw new IllegalStateException("Index: " + idx); + } + } + } + + /** + * Met index. + */ + private static class MetaIndex extends BaseIndex { + /** */ + private final ConcurrentMap rows = new ConcurrentHashMap8<>(); + + /** {@inheritDoc} */ + @Override public void checkRename() { + throw DbException.getUnsupportedException("rename"); + } + + /** {@inheritDoc} */ + @Override public void close(Session session) { + // No-op. + } + + /** + * @param row Row. + * @return ID. + */ + private static ValueInt id(SearchRow row) { + Value id = row.getValue(ID); + + assert id != null; + + return (ValueInt)id; + } + + /** {@inheritDoc} */ + @Override public void add(Session session, Row row) { + rows.put(id(row), (GridH2Row)row); + } + + /** {@inheritDoc} */ + @Override public void remove(Session session, Row row) { + rows.remove(id(row), row); + } + + /** {@inheritDoc} */ + @Override public Cursor find(Session session, SearchRow first, SearchRow last) { + if (first == null || last == null || !Objects.equals(id(first), id(last))) + return new GridH2Cursor(rows.values().iterator()); + + return new SingleRowCursor(rows.get(id(first))); + } + + /** {@inheritDoc} */ + @Override public double getCost(Session session, int[] masks, TableFilter[] filters, + int filter, SortOrder sortOrder) { + if ((masks[ID] & IndexCondition.EQUALITY) == IndexCondition.EQUALITY) + return 1; + + return 1000 + rows.size(); + } + + /** {@inheritDoc} */ + @Override public void remove(Session session) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void truncate(Session session) { + rows.clear(); + } + + /** {@inheritDoc} */ + @Override public boolean canGetFirstOrLast() { + return false; + } + + /** {@inheritDoc} */ + @Override public Cursor findFirstOrLast(Session session, boolean first) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean needRebuild() { + return false; + } + + /** {@inheritDoc} */ + @Override public long getRowCount(Session session) { + return rows.size(); + } + + /** {@inheritDoc} */ + @Override public long getRowCountApproximation() { + return getRowCount(null); + } + + /** {@inheritDoc} */ + @Override public long getDiskSpaceUsed() { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java new file mode 100644 index 0000000..19ea2b2 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; + +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; + +/** + * Thread local SQL query context which is intended to be accessible from everywhere. + */ +public class GridH2QueryContext { + /** */ + private static final ThreadLocal qctx = new ThreadLocal<>(); + + /** */ + private static final ConcurrentMap qctxs = new ConcurrentHashMap8<>(); + + /** */ + private final Key key; + + /** */ + private volatile boolean cleared; + + /** Index snapshots. */ + @GridToStringInclude + private Map snapshots; + + /** */ + private List reservations; + + /** Range streams for indexes. */ + private Map streams; + + /** Range sources for indexes. */ + private Map sources; + + /** */ + private int batchLookupIdGen; + + /** */ + private IndexingQueryFilter filter; + + /** */ + private AffinityTopologyVersion topVer; + + /** */ + private Map partsMap; + + /** */ + private UUID[] partsNodes; + + /** */ + private boolean distributedJoins; + + /** */ + private int pageSize; + + /** */ + private GridH2CollocationModel qryCollocationMdl; + + /** + * @param locNodeId Local node ID. + * @param nodeId The node who initiated the query. + * @param qryId The query ID. + * @param type Query type. + */ + public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) { + key = new Key(locNodeId, nodeId, qryId, type); + } + + /** + * @return Type. + */ + public GridH2QueryType type() { + return key.type; + } + + /** + * @return Origin node ID. + */ + public UUID originNodeId() { + return key.nodeId; + } + + /** + * @return Query request ID. + */ + public long queryId() { + return key.qryId; + } + + /** + * @return Query collocation model. + */ + public GridH2CollocationModel queryCollocationModel() { + return qryCollocationMdl; + } + + /** + * @param qryCollocationMdl Query collocation model. + */ + public void queryCollocationModel(GridH2CollocationModel qryCollocationMdl) { + this.qryCollocationMdl = qryCollocationMdl; + } + + /** + * @param distributedJoins Distributed joins can be run in this query. + * @return {@code this}. + */ + public GridH2QueryContext distributedJoins(boolean distributedJoins) { + this.distributedJoins = distributedJoins; + + return this; + } + + /** + * @return {@code true} If distributed joins can be run in this query. + */ + public boolean distributedJoins() { + return distributedJoins; + } + + /** + * @param reservations Reserved partitions or group reservations. + * @return {@code this}. + */ + public GridH2QueryContext reservations(List reservations) { + this.reservations = reservations; + + return this; + } + + /** + * @param topVer Topology version. + * @return {@code this}. + */ + public GridH2QueryContext topologyVersion(AffinityTopologyVersion topVer) { + this.topVer = topVer; + + return this; + } + + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @param partsMap Partitions map. + * @return {@code this}. + */ + public GridH2QueryContext partitionsMap(Map partsMap) { + this.partsMap = partsMap; + + return this; + } + + /** + * @return Partitions map. + */ + public Map partitionsMap() { + return partsMap; + } + + /** + * @param p Partition. + * @param cctx Cache context. + * @return Owning node ID. + */ + public UUID nodeForPartition(int p, GridCacheContext cctx) { + UUID[] nodeIds = partsNodes; + + if (nodeIds == null) { + assert partsMap != null; + + nodeIds = new UUID[cctx.affinity().partitions()]; + + for (Map.Entry e : partsMap.entrySet()) { + UUID nodeId = e.getKey(); + int[] nodeParts = e.getValue(); + + assert nodeId != null; + assert !F.isEmpty(nodeParts); + + for (int part : nodeParts) { + assert nodeIds[part] == null; + + nodeIds[part] = nodeId; + } + } + + partsNodes = nodeIds; + } + + return nodeIds[p]; + } + + /** + * @param idxId Index ID. + * @param snapshot Index snapshot. + */ + public void putSnapshot(long idxId, Object snapshot) { + assert snapshot != null; + assert get() == null : "need to snapshot indexes before setting query context for correct visibility"; + + if (snapshot instanceof GridReservable && !((GridReservable)snapshot).reserve()) + throw new IllegalStateException("Must be already reserved before."); + + if (snapshots == null) + snapshots = new HashMap<>(); + + if (snapshots.put(idxId, snapshot) != null) + throw new IllegalStateException("Index already snapshoted."); + } + + /** + * Clear taken snapshots. + */ + public void clearSnapshots() { + if (F.isEmpty(snapshots)) + return; + + for (Object snapshot : snapshots.values()) { + if (snapshot instanceof GridReservable) + ((GridReservable)snapshot).release(); + } + + snapshots = null; + } + + /** + * @param idxId Index ID. + * @return Index snapshot or {@code null} if none. + */ + @SuppressWarnings("unchecked") + public T getSnapshot(long idxId) { + if (snapshots == null) + return null; + + return (T)snapshots.get(idxId); + } + + /** + * @param batchLookupId Batch lookup ID. + * @param streams Range streams. + */ + public synchronized void putStreams(int batchLookupId, Object streams) { + if (this.streams == null) { + if (streams == null) + return; + + this.streams = new HashMap<>(); + } + + if (streams == null) + this.streams.remove(batchLookupId); + else + this.streams.put(batchLookupId, streams); + } + + /** + * @param batchLookupId Batch lookup ID. + * @return Range streams. + */ + @SuppressWarnings("unchecked") + public synchronized T getStreams(int batchLookupId) { + if (streams == null) + return null; + + return (T)streams.get(batchLookupId); + } + + /** + * @param ownerId Owner node ID. + * @param batchLookupId Batch lookup ID. + * @param src Range source. + */ + public synchronized void putSource(UUID ownerId, int batchLookupId, Object src) { + SourceKey srcKey = new SourceKey(ownerId, batchLookupId); + + if (src != null) { + if (sources == null) + sources = new HashMap<>(); + + sources.put(srcKey, src); + } + else if (sources != null) + sources.remove(srcKey); + } + + /** + * @param ownerId Owner node ID. + * @param batchLookupId Batch lookup ID. + * @return Range source. + */ + @SuppressWarnings("unchecked") + public synchronized T getSource(UUID ownerId, int batchLookupId) { + if (sources == null) + return null; + + return (T)sources.get(new SourceKey(ownerId, batchLookupId)); + } + + /** + * @return Next batch ID. + */ + public int nextBatchLookupId() { + return ++batchLookupIdGen; + } + + /** + * @return If indexes were snapshotted before query execution. + */ + public boolean hasIndexSnapshots() { + return snapshots != null; + } + + /** + * Sets current thread local context. This method must be called when all the non-volatile properties are + * already set to ensure visibility for other threads. + * + * @param x Query context. + */ + public static void set(GridH2QueryContext x) { + assert qctx.get() == null; + + // We need MAP query context to be available to other threads to run distributed joins. + if (x.key.type == MAP && x.distributedJoins() && qctxs.putIfAbsent(x.key, x) != null) + throw new IllegalStateException("Query context is already set."); + + qctx.set(x); + } + + /** + * Drops current thread local context. + */ + public static void clearThreadLocal() { + GridH2QueryContext x = qctx.get(); + + assert x != null; + + qctx.remove(); + } + + /** + * @param locNodeId Local node ID. + * @param nodeId The node who initiated the query. + * @param qryId The query ID. + * @param type Query type. + * @return {@code True} if context was found. + */ + public static boolean clear(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) { + return doClear(new Key(locNodeId, nodeId, qryId, type), false); + } + + /** + * @param key Context key. + * @param nodeStop Node is stopping. + * @return {@code True} if context was found. + */ + private static boolean doClear(Key key, boolean nodeStop) { + assert key.type == MAP : key.type; + + GridH2QueryContext x = qctxs.remove(key); + + if (x == null) + return false; + + assert x.key.equals(key); + + x.clearContext(nodeStop); + + return true; + } + + /** + * @param nodeStop Node is stopping. + */ + public void clearContext(boolean nodeStop) { + cleared = true; + + clearSnapshots(); + + List r = reservations; + + if (!nodeStop && !F.isEmpty(r)) { + for (int i = 0; i < r.size(); i++) + r.get(i).release(); + } + } + + /** + * @return {@code true} If the context is cleared. + */ + public boolean isCleared() { + return cleared; + } + + /** + * @param locNodeId Local node ID. + * @param nodeId Dead node ID. + */ + public static void clearAfterDeadNode(UUID locNodeId, UUID nodeId) { + for (Key key : qctxs.keySet()) { + if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId)) + doClear(key, false); + } + } + + /** + * @param locNodeId Local node ID. + */ + public static void clearLocalNodeStop(UUID locNodeId) { + for (Key key : qctxs.keySet()) { + if (key.locNodeId.equals(locNodeId)) + doClear(key, true); + } + } + + /** + * Access current thread local query context (if it was set). + * + * @return Current thread local query context or {@code null} if the query runs outside of Ignite context. + */ + @Nullable public static GridH2QueryContext get() { + return qctx.get(); + } + + /** + * Access query context from another thread. + * + * @param locNodeId Local node ID. + * @param nodeId The node who initiated the query. + * @param qryId The query ID. + * @param type Query type. + * @return Query context. + */ + @Nullable public static GridH2QueryContext get( + UUID locNodeId, + UUID nodeId, + long qryId, + GridH2QueryType type + ) { + return qctxs.get(new Key(locNodeId, nodeId, qryId, type)); + } + + /** + * @return Filter. + */ + public IndexingQueryFilter filter() { + return filter; + } + + /** + * @param filter Filter. + * @return {@code this}. + */ + public GridH2QueryContext filter(IndexingQueryFilter filter) { + this.filter = filter; + + return this; + } + + /** + * @return Page size. + */ + public int pageSize() { + return pageSize; + } + + /** + * @param pageSize Page size. + * @return {@code this}. + */ + public GridH2QueryContext pageSize(int pageSize) { + this.pageSize = pageSize; + + return this; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridH2QueryContext.class, this); + } + + /** + * Unique key for the query context. + */ + private static class Key { + /** */ + private final UUID locNodeId; + + /** */ + private final UUID nodeId; + + /** */ + private final long qryId; + + /** */ + private final GridH2QueryType type; + + /** + * @param locNodeId Local node ID. + * @param nodeId The node who initiated the query. + * @param qryId The query ID. + * @param type Query type. + */ + private Key(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) { + assert locNodeId != null; + assert nodeId != null; + assert type != null; + + this.locNodeId = locNodeId; + this.nodeId = nodeId; + this.qryId = qryId; + this.type = type; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Key key = (Key)o; + + return qryId == key.qryId && nodeId.equals(key.nodeId) && type == key.type && + locNodeId.equals(key.locNodeId) ; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = locNodeId.hashCode(); + + res = 31 * res + nodeId.hashCode(); + res = 31 * res + (int)(qryId ^ (qryId >>> 32)); + res = 31 * res + type.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Key.class, this); + } + } + + /** + * Key for source. + */ + private static final class SourceKey { + /** */ + UUID ownerId; + + /** */ + int batchLookupId; + + /** + * @param ownerId Owner node ID. + * @param batchLookupId Batch lookup ID. + */ + SourceKey(UUID ownerId, int batchLookupId) { + this.ownerId = ownerId; + this.batchLookupId = batchLookupId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + SourceKey srcKey = (SourceKey)o; + + return batchLookupId == srcKey.batchLookupId && ownerId.equals(srcKey.ownerId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31 * ownerId.hashCode() + batchLookupId; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java new file mode 100644 index 0000000..f6d0408 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +/** + * Query type. + */ +public enum GridH2QueryType { + /** + * Map query. Runs over local partitions, possibly with distributed joins. + */ + MAP, + + /** + * Reduce query. Local query on a node which initiated the original query. + */ + REDUCE, + + /** + * Local query. It may be also a query over replicated cache but all the data is available locally. + */ + LOCAL, + + /** + * Replicated query over a network. Such a query can be sent from a client node or node which + * did not load all the partitions yet. + */ + REPLICATED, + + /** + * Parsing and optimization stage. + */ + PREPARE, +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RetryException.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RetryException.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RetryException.java new file mode 100644 index 0000000..e333d71 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RetryException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +import org.apache.ignite.IgniteException; + +/** + * Internal exception. + */ +public class GridH2RetryException extends IgniteException { + /** + * @param msg Message. + */ + public GridH2RetryException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java index 3b6b56e..9486a2e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java @@ -18,19 +18,14 @@ package org.apache.ignite.internal.processors.query.h2.opt; import org.h2.result.Row; +import org.h2.result.SearchRow; +import org.h2.store.Data; import org.h2.value.Value; /** * Row with locking support needed for unique key conflicts resolution. */ -public class GridH2Row extends Row implements GridSearchRowPointer { - /** - * @param data Column values. - */ - public GridH2Row(Value... data) { - super(data, MEMORY_CALCULATE); - } - +public abstract class GridH2Row extends Row implements GridSearchRowPointer { /** {@inheritDoc} */ @Override public long pointer() { throw new IllegalStateException(); @@ -45,4 +40,79 @@ public class GridH2Row extends Row implements GridSearchRowPointer { @Override public void decrementRefCount() { throw new IllegalStateException(); } + + /** {@inheritDoc} */ + @Override public Row getCopy() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void setVersion(int version) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int getByteCount(Data dummy) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void setDeleted(boolean deleted) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void setSessionId(int sessionId) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int getSessionId() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void commit() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean isDeleted() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void setKeyAndVersion(SearchRow old) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int getVersion() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void setKey(long key) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public long getKey() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int getMemory() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public Value[] getValueList() { + throw new UnsupportedOperationException(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java index 80dcfcb..f519c30 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java @@ -18,7 +18,10 @@ package org.apache.ignite.internal.processors.query.h2.opt; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSmartPointerFactory; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard; @@ -31,9 +34,30 @@ import org.jetbrains.annotations.Nullable; */ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory { /** - * @return Owner. + * Gets indexing. + * + * @return indexing. + */ + public IgniteH2Indexing indexing(); + + /** + * Gets type descriptor. + * + * @return Type descriptor. + */ + public GridQueryTypeDescriptor type(); + + /** + * Gets cache context for this row descriptor. + * + * @return Cache context. + */ + public GridCacheContext context(); + + /** + * @return Cache configuration. */ - public IgniteH2Indexing owner(); + public CacheConfiguration configuration(); /** * Creates new row. http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java new file mode 100644 index 0000000..00ff3f2 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.h2.result.RowFactory; +import org.h2.value.Value; + +/** + * Row factory. + */ +public class GridH2RowFactory extends RowFactory { + /** + * @param v Value. + * @return Row. + */ + public static GridH2Row create(Value v) { + return new RowKey(v); + } + + /** + * @param v1 Value 1. + * @param v2 Value 2. + * @return Row. + */ + public static GridH2Row create(Value v1, Value v2) { + return new RowPair(v1, v2); + } + + /** + * @param data Values. + * @return Row. + */ + public static GridH2Row create(Value... data) { + switch (data.length) { + case 0: + throw new IllegalStateException("Zero columns row."); + + case 1: + return new RowKey(data[0]); + + case 2: + return new RowPair(data[0], data[1]); + + default: + return new RowSimple(data); + } + } + + /** {@inheritDoc} */ + @Override public GridH2Row createRow(Value[] data, int memory) { + return create(data); + } + + /** + * Single value row. + */ + private static final class RowKey extends GridH2Row { + /** */ + private Value key; + + /** + * @param key Key. + */ + public RowKey(Value key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public int getColumnCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public Value getValue(int idx) { + assert idx == 0 : idx; + return key; + } + + /** {@inheritDoc} */ + @Override public void setValue(int idx, Value v) { + assert idx == 0 : idx; + key = v; + } + } + + /** + * Row of two values. + */ + private static final class RowPair extends GridH2Row { + /** */ + private Value v1; + + /** */ + private Value v2; + + /** + * @param v1 First value. + * @param v2 Second value. + */ + private RowPair(Value v1, Value v2) { + this.v1 = v1; + this.v2 = v2; + } + + /** {@inheritDoc} */ + @Override public int getColumnCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public Value getValue(int idx) { + return idx == 0 ? v1 : v2; + } + + /** {@inheritDoc} */ + @Override public void setValue(int idx, Value v) { + if (idx == 0) + v1 = v; + else { + assert idx == 1 : idx; + + v2 = v; + } + } + } + + /** + * Simple array based row. + */ + private static final class RowSimple extends GridH2Row { + /** */ + @GridToStringInclude + private Value[] vals; + + /** + * @param vals Values. + */ + private RowSimple(Value[] vals) { + this.vals = vals; + } + + /** {@inheritDoc} */ + @Override public int getColumnCount() { + return vals.length; + } + + /** {@inheritDoc} */ + @Override public Value getValue(int idx) { + return vals[idx]; + } + + /** {@inheritDoc} */ + @Override public void setValue(int idx, Value v) { + vals[idx] = v; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RowSimple.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index bea4dd8..8d080ae 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -23,22 +23,24 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.Set; -import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.h2.api.TableEngine; import org.h2.command.ddl.CreateTableData; -import org.h2.engine.Constants; import org.h2.engine.Database; import org.h2.engine.DbObject; import org.h2.engine.Session; +import org.h2.index.BaseIndex; import org.h2.index.Cursor; import org.h2.index.Index; +import org.h2.index.IndexLookupBatch; import org.h2.index.IndexType; import org.h2.message.DbException; import org.h2.result.Row; @@ -55,7 +57,10 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.LongAdder8; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; /** * H2 Table implementation. @@ -74,10 +79,16 @@ public class GridH2Table extends TableBase { private final ReadWriteLock lock; /** */ - private final Set sessions = Collections.newSetFromMap(new ConcurrentHashMap8()); + private boolean destroyed; /** */ - private volatile Object[] actualSnapshot; + private final Set sessions = Collections.newSetFromMap(new ConcurrentHashMap8()); + + /** */ + private final AtomicReference actualSnapshot = new AtomicReference<>(); + + /** */ + private IndexColumn affKeyCol; /** */ private final LongAdder8 size = new LongAdder8(); @@ -102,17 +113,57 @@ public class GridH2Table extends TableBase { this.desc = desc; this.spaceName = spaceName; + if (desc != null && desc.context() != null && !desc.context().customAffinityMapper()) { + boolean affinityColExists = true; + + String affKey = desc.type().affinityKey(); + + int affKeyColId = -1; + + if (affKey != null) { + String colName = desc.context().config().isSqlEscapeAll() ? affKey : affKey.toUpperCase(); + + if (doesColumnExist(colName)) + affKeyColId = getColumn(colName).getColumnId(); + else + affinityColExists = false; + } + else + affKeyColId = KEY_COL; + + if (affinityColExists) { + affKeyCol = indexColumn(affKeyColId, SortOrder.ASCENDING); + + assert affKeyCol != null; + } + } + + // Indexes must be created in the end when everything is ready. idxs = idxsFactory.createIndexes(this); assert idxs != null; assert idxs.size() >= 1; - lock = new ReentrantReadWriteLock(); - // Add scan index at 0 which is required by H2. idxs.add(0, new ScanIndex(index(0))); snapshotEnabled = desc == null || desc.snapshotableIndex(); + + lock = snapshotEnabled ? new ReentrantReadWriteLock() : null; + } + + /** + * @return {@code true} If this is a partitioned table. + */ + public boolean isPartitioned() { + return desc != null && desc.configuration().getCacheMode() == PARTITIONED; + } + + /** + * @return Affinity key column or {@code null} if not available. + */ + @Nullable public IndexColumn getAffinityKeyColumn() { + return affKeyCol; } /** {@inheritDoc} */ @@ -172,7 +223,7 @@ public class GridH2Table extends TableBase { GridUnsafeMemory mem = desc.memory(); - readLock(); + Lock l = lock(false, Long.MAX_VALUE); if (mem != null) desc.guard().begin(); @@ -191,7 +242,7 @@ public class GridH2Table extends TableBase { return true; } finally { - readUnlock(); + unlock(l); if (mem != null) desc.guard().end(); @@ -207,118 +258,219 @@ public class GridH2Table extends TableBase { /** {@inheritDoc} */ @SuppressWarnings({"LockAcquiredButNotSafelyReleased", "SynchronizationOnLocalVariableOrMethodParameter", "unchecked"}) - @Override public void lock(@Nullable final Session ses, boolean exclusive, boolean force) { + @Override public boolean lock(@Nullable final Session ses, boolean exclusive, boolean force) { if (ses != null) { if (!sessions.add(ses)) - return; + return false; - synchronized (ses) { - ses.addLock(this); - } + ses.addLock(this); } + if (snapshotInLock()) + snapshotIndexes(null); + + return false; + } + + /** + * @return {@code True} If we must snapshot and release index snapshots in {@link #lock(Session, boolean, boolean)} + * and {@link #unlock(Session)} methods. + */ + private boolean snapshotInLock() { + if (!snapshotEnabled) + return false; + + GridH2QueryContext qctx = GridH2QueryContext.get(); + + // On MAP queries with distributed joins we lock tables before the queries. + return qctx == null || qctx.type() != MAP || !qctx.hasIndexSnapshots(); + } + + /** + * @param qctx Query context. + */ + public void snapshotIndexes(GridH2QueryContext qctx) { if (!snapshotEnabled) return; - Object[] snapshot; + Object[] snapshots; - for (long waitTime = 100;; waitTime *= 2) { // Increase wait time to avoid starvation. - snapshot = actualSnapshot; + Lock l; - if (snapshot != null) { - // Reuse existing snapshot without locking. - for (int i = 1, len = idxs.size(); i < len; i++) - index(i).takeSnapshot(snapshot[i - 1]); + // Try to reuse existing snapshots outside of the lock. + for (long waitTime = 200;; waitTime *= 2) { // Increase wait time to avoid starvation. + snapshots = actualSnapshot.get(); - return; - } + if (snapshots != null) { // Reuse existing snapshot without locking. + snapshots = doSnapshotIndexes(snapshots, qctx); - try { - if (lock.writeLock().tryLock(waitTime, TimeUnit.MILLISECONDS)) - break; - } - catch (InterruptedException e) { - throw new IgniteException("Thread got interrupted while trying to acquire index lock.", e); + if (snapshots != null) + return; // Reused successfully. } - } - boolean snapshoted = false; + l = lock(true, waitTime); + + if (l != null) + break; + } try { - snapshot = actualSnapshot; // Try again inside of the lock. + // Try again inside of the lock. + snapshots = actualSnapshot.get(); + + if (snapshots != null) // Try reusing. + snapshots = doSnapshotIndexes(snapshots, qctx); - if (snapshot == null) { - snapshot = takeIndexesSnapshot(); + if (snapshots == null) { // Reuse failed, produce new snapshots. + snapshots = doSnapshotIndexes(null, qctx); - if (desc == null || desc.memory() == null) // This optimization is disabled for off-heap index. - actualSnapshot = snapshot; + assert snapshots != null; - snapshoted = true; + actualSnapshot.set(snapshots); } } finally { - lock.writeLock().unlock(); + unlock(l); } + } - if (!snapshoted) { - for (int i = 1, len = idxs.size(); i < len; i++) - index(i).takeSnapshot(snapshot[i - 1]); + /** + * @return Table identifier. + */ + public String identifier() { + return getSchema().getName() + '.' + getName(); + } + + /** + * @param l Lock. + */ + private static void unlock(Lock l) { + if (l != null) + l.unlock(); + } + + /** + * @param exclusive Exclusive lock. + * @param waitMillis Milliseconds to wait for the lock. + * @return The acquired lock or {@code null} if the lock time out occurred. + */ + public Lock lock(boolean exclusive, long waitMillis) { + if (!snapshotEnabled) + return null; + + Lock l = exclusive ? lock.writeLock() : lock.readLock(); + + try { + if (!l.tryLock(waitMillis, TimeUnit.MILLISECONDS)) + return null; + } + catch (InterruptedException e) { + throw new IgniteInterruptedException("Thread got interrupted while trying to acquire table lock.", e); } + + if (destroyed) { + unlock(l); + + throw new IllegalStateException("Table " + identifier() + " already destroyed."); + } + + return l; } /** * Must be called inside of write lock because when using multiple indexes we have to ensure that all of them have * the same contents at snapshot taking time. * + * @param qctx Query context. * @return New indexes data snapshot. */ @SuppressWarnings("unchecked") - private Object[] takeIndexesSnapshot() { - int len = idxs.size(); + private Object[] doSnapshotIndexes(Object[] snapshots, GridH2QueryContext qctx) { + assert snapshotEnabled; - Object[] snapshot = new ConcurrentNavigableMap[len - 1]; + if (snapshots == null) // Nothing to reuse, create new snapshots. + snapshots = new Object[idxs.size() - 1]; - for (int i = 1; i < len; i++) { // Take snapshots on all except first which is scan. - Object s = index(i).takeSnapshot(null); + // Take snapshots on all except first which is scan. + for (int i = 1, len = idxs.size(); i < len; i++) { + Object s = snapshots[i - 1]; - snapshot[i - 1] = s; - } + boolean reuseExisting = s != null; - return snapshot; - } + s = index(i).takeSnapshot(s, qctx); - /** {@inheritDoc} */ - @Override public void close(Session ses) { - assert !sessions.contains(ses); - } + if (reuseExisting && s == null) { // Existing snapshot was invalidated before we were able to reserve it. + // Release already taken snapshots. + if (qctx != null) + qctx.clearSnapshots(); - /** {@inheritDoc} */ - @Override public void unlock(@Nullable Session ses) { - if (ses != null) { - boolean res = sessions.remove(ses); + for (int j = 1; j < i; j++) + index(j).releaseSnapshot(); + + // Drop invalidated snapshot. + actualSnapshot.compareAndSet(snapshots, null); + + return null; + } - assert res; + snapshots[i - 1] = s; } - for (int i = 1, len = idxs.size(); i < len; i++) // Release snapshots on all except first which is scan. - index(i).releaseSnapshot(); + return snapshots; + } + + /** {@inheritDoc} */ + @Override public void close(Session ses) { + // No-op. } /** - * Closes table and releases resources. + * Destroy the table. */ - public void close() { - writeLock(); + public void destroy() { + Lock l = lock(true, Long.MAX_VALUE); try { + assert sessions.isEmpty() : sessions; + + destroyed = true; + for (int i = 1, len = idxs.size(); i < len; i++) - index(i).close(null); + index(i).destroy(); } finally { - writeUnlock(); + unlock(l); } } + /** {@inheritDoc} */ + @Override public void unlock(@Nullable Session ses) { + if (ses != null && !sessions.remove(ses)) + return; + + if (snapshotInLock()) + releaseSnapshots(); + } + + /** + * Releases snapshots. + */ + public void releaseSnapshots() { + if (!snapshotEnabled) + return; + + releaseSnapshots0(idxs); + } + + /** + * @param idxs Indexes. + */ + private void releaseSnapshots0(ArrayList idxs) { + // Release snapshots on all except first which is scan. + for (int i = 1, len = idxs.size(); i < len; i++) + ((GridH2IndexBase)idxs.get(i)).releaseSnapshot(); + } + /** * Updates table for given key. If value is null then row with given key will be removed from table, * otherwise value and expiration time will be updated or new row will be added. @@ -336,7 +488,16 @@ public class GridH2Table extends TableBase { GridH2Row row = desc.createRow(key, val, expirationTime); - return doUpdate(row, rmv); + if (!rmv) + ((GridH2AbstractKeyValueRow)row).valuesCache(new Value[getColumns().length]); + + try { + return doUpdate(row, rmv); + } + finally { + if (!rmv) + ((GridH2AbstractKeyValueRow)row).valuesCache(null); + } } /** @@ -372,7 +533,7 @@ public class GridH2Table extends TableBase { // getting updated from different threads with different rows with the same key is impossible. GridUnsafeMemory mem = desc == null ? null : desc.memory(); - readLock(); + Lock l = lock(false, Long.MAX_VALUE); if (mem != null) desc.guard().begin(); @@ -400,7 +561,7 @@ public class GridH2Table extends TableBase { while (++i < len) { GridH2IndexBase idx = index(i); - assert !idx.getIndexType().isUnique() : "Unique indexes are not supported."; + assert !idx.getIndexType().isUnique() : "Unique indexes are not supported: " + idx; GridH2Row old2 = idx.put(row); @@ -417,7 +578,7 @@ public class GridH2Table extends TableBase { // index(1) is PK, get full row from there (search row here contains only key but no other columns). GridH2Row old = pk.remove(row); - if (old instanceof GridH2AbstractKeyValueRow) { // Unswap value. + if (row.getColumnCount() != 1 && old instanceof GridH2AbstractKeyValueRow) { // Unswap value. Value v = row.getValue(VAL_COL); if (v != null) @@ -440,12 +601,12 @@ public class GridH2Table extends TableBase { } // The snapshot is not actual after update. - actualSnapshot = null; + actualSnapshot.set(null); return true; } finally { - readUnlock(); + unlock(l); if (mem != null) desc.guard().end(); @@ -485,13 +646,12 @@ public class GridH2Table extends TableBase { if (!snapshotEnabled) return; - GridUnsafeMemory memory = desc == null ? null : desc.memory(); + Lock l = lock(true, Long.MAX_VALUE); - lock.writeLock().lock(); + ArrayList idxs0 = new ArrayList<>(idxs); try { - if (memory == null && actualSnapshot == null) - actualSnapshot = takeIndexesSnapshot(); // Allow read access while we are rebuilding indexes. + snapshotIndexes(null); // Allow read access while we are rebuilding indexes. for (int i = 1, len = idxs.size(); i < len; i++) { GridH2IndexBase newIdx = index(i).rebuild(); @@ -502,13 +662,13 @@ public class GridH2Table extends TableBase { idxs.set(0, new ScanIndex(newIdx)); } } - catch (InterruptedException ignored) { - // No-op. + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); } finally { - lock.writeLock().unlock(); + releaseSnapshots0(idxs0); - actualSnapshot = null; + unlock(l); } } @@ -621,38 +781,6 @@ public class GridH2Table extends TableBase { } /** - * - */ - private void readLock() { - if (snapshotEnabled) - lock.readLock().lock(); - } - - /** - * - */ - private void readUnlock() { - if (snapshotEnabled) - lock.readLock().unlock(); - } - - /** - * - */ - private void writeLock() { - if (snapshotEnabled) - lock.writeLock().lock(); - } - - /** - * - */ - private void writeUnlock() { - if (snapshotEnabled) - lock.writeLock().unlock(); - } - - /** * H2 Table engine. */ @SuppressWarnings({"PublicInnerClass", "FieldAccessedSynchronizedAndUnsynchronized"}) @@ -729,7 +857,7 @@ public class GridH2Table extends TableBase { * Wrapper type for primary key. */ @SuppressWarnings("PackageVisibleInnerClass") - static class ScanIndex implements Index { + static class ScanIndex extends BaseIndex { /** */ static final String SCAN_INDEX_NAME_SUFFIX = "__SCAN_"; @@ -774,8 +902,8 @@ public class GridH2Table extends TableBase { } /** {@inheritDoc} */ - @Override public void close(Session ses) { - delegate.close(ses); + @Override public final void close(Session ses) { + // No-op. } /** {@inheritDoc} */ @@ -819,8 +947,13 @@ public class GridH2Table extends TableBase { } /** {@inheritDoc} */ - @Override public double getCost(Session ses, int[] masks, TableFilter tblFilter, SortOrder sortOrder) { - return getRowCountApproximation() + Constants.COST_ROW_OFFSET; + @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, + SortOrder sortOrder) { + long rows = getRowCountApproximation(); + double baseCost = getCostRangeIndex(masks, rows, filters, filter, sortOrder, true); + int mul = delegate.getDistributedMultiplier(ses, filters, filter); + + return mul * baseCost; } /** {@inheritDoc} */ @@ -884,6 +1017,11 @@ public class GridH2Table extends TableBase { } /** {@inheritDoc} */ + @Override public IndexLookupBatch createLookupBatch(TableFilter filter) { + return delegate.createLookupBatch(filter); + } + + /** {@inheritDoc} */ @Override public void truncate(Session ses) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java index 2c95b66..33aaf7b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java @@ -17,10 +17,9 @@ package org.apache.ignite.internal.processors.query.h2.opt; -import java.io.Closeable; -import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; +import java.util.List; import java.util.NavigableMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -28,14 +27,14 @@ import org.apache.ignite.internal.util.GridEmptyIterator; import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard; import org.apache.ignite.internal.util.snaptree.SnapTreeMap; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.h2.engine.Session; import org.h2.index.Cursor; import org.h2.index.IndexType; -import org.h2.index.SingleRowCursor; -import org.h2.result.Row; +import org.h2.message.DbException; import org.h2.result.SearchRow; import org.h2.result.SortOrder; import org.h2.table.IndexColumn; @@ -49,10 +48,7 @@ import org.jetbrains.annotations.Nullable; @SuppressWarnings("ComparatorNotSerializable") public class GridH2TreeIndex extends GridH2IndexBase implements Comparator { /** */ - protected final ConcurrentNavigableMap tree; - - /** */ - private final ThreadLocal> snapshot = new ThreadLocal<>(); + private final ConcurrentNavigableMap tree; /** */ private final boolean snapshotEnabled; @@ -63,19 +59,11 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator colsList) { + IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]); IndexColumn.mapColumns(cols, tbl); @@ -146,61 +134,25 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator)s); - return s; + initDistributedJoinMessaging(tbl); } - /** - * Releases snapshot for current thread. - */ - @Override public void releaseSnapshot() { - if (!snapshotEnabled) - return; - - ConcurrentNavigableMap s = snapshot.get(); - - snapshot.set(null); + /** {@inheritDoc} */ + @Override protected Object doTakeSnapshot() { + assert snapshotEnabled; - if (s instanceof Closeable) - U.closeQuiet((Closeable)s); + return tree instanceof SnapTreeMap ? + ((SnapTreeMap)tree).clone() : + ((GridOffHeapSnapTreeMap)tree).clone(); } - /** - * @return Snapshot for current thread if there is one. - */ - private ConcurrentNavigableMap treeForRead() { + /** {@inheritDoc} */ + protected final ConcurrentNavigableMap treeForRead() { if (!snapshotEnabled) return tree; - ConcurrentNavigableMap res = snapshot.get(); + ConcurrentNavigableMap res = threadLocalSnapshot(); if (res == null) res = tree; @@ -209,19 +161,21 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator iter = doFind(null, false, null); @@ -269,13 +223,17 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator doFind(@Nullable SearchRow first, boolean includeFirst, - @Nullable SearchRow last) { + private Iterator doFind(@Nullable SearchRow first, boolean includeFirst, @Nullable SearchRow last) { ConcurrentNavigableMap t = treeForRead(); + return doFind0(t, first, includeFirst, last, threadLocalFilter()); + } + + /** {@inheritDoc} */ + @Override protected final Iterator doFind0( + ConcurrentNavigableMap t, + @Nullable SearchRow first, + boolean includeFirst, + @Nullable SearchRow last, + IndexingQueryFilter filter + ) { includeFirst &= first != null; NavigableMap range = subTree(t, comparable(first, includeFirst ? -1 : 1), @@ -321,7 +289,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator(); - return filter(range.values().iterator()); + return filter(range.values().iterator(), filter); } /** @@ -381,21 +349,12 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator tree = treeForRead(); - - Iterator iter = filter(first ? tree.values().iterator() : tree.descendingMap().values().iterator()); - - GridSearchRowPointer res = null; - - if (iter.hasNext()) - res = iter.next(); - - return new SingleRowCursor((Row)res); + throw DbException.throwInternalError(); } /** {@inheritDoc} */ @@ -503,11 +462,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator localCalendar = new ThreadLocal<>(); - - /** - * @return The instance of calendar for local thread. - */ - public static Calendar getLocalCalendar() { - Calendar res = localCalendar.get(); - - if (res == null) { - res = (Calendar)staticCalendar.clone(); - - localCalendar.set(res); - } - - return res; - } - - /** - * Get or create a timestamp value for the given timestamp. - * - * Copy/pasted from org.h2.value.ValueTimestamp#get(java.sql.Timestamp) - * - * @param timestamp The timestamp. - * @return The value. - */ - public static ValueTimestamp toValueTimestamp(Timestamp timestamp) { - long ms = timestamp.getTime(); - long nanos = timestamp.getNanos() % 1000000; - - Calendar calendar = getLocalCalendar(); - - calendar.clear(); - calendar.setTimeInMillis(ms); - - long dateValue = dateValueFromCalendar(calendar); - - nanos += nanosFromCalendar(calendar); - - return ValueTimestamp.fromDateValueAndNanos(dateValue, nanos); - } - - /** - * Calculate the nanoseconds since midnight from a given calendar. - * - * Copy/pasted from org.h2.util.DateTimeUtils#nanosFromCalendar(java.util.Calendar). - * - * @param cal The calendar. - * @return Nanoseconds. - */ - private static long nanosFromCalendar(Calendar cal) { - int h = cal.get(Calendar.HOUR_OF_DAY); - int m = cal.get(Calendar.MINUTE); - int s = cal.get(Calendar.SECOND); - int millis = cal.get(Calendar.MILLISECOND); - - return ((((((h * 60L) + m) * 60) + s) * 1000) + millis) * 1000000; - } - - /** - * Calculate the date value from a given calendar. - * - * Copy/pasted from org.h2.util.DateTimeUtils#dateValueFromCalendar(java.util.Calendar) - * - * @param cal The calendar. - * @return The date value. - */ - private static long dateValueFromCalendar(Calendar cal) { - int year, month, day; - - year = getYear(cal); - month = cal.get(Calendar.MONTH) + 1; - day = cal.get(Calendar.DAY_OF_MONTH); - - return ((long) year << SHIFT_YEAR) | (month << SHIFT_MONTH) | day; - } - - /** - * Get the year (positive or negative) from a calendar. - * - * Copy/pasted from org.h2.util.DateTimeUtils#getYear(java.util.Calendar) - * - * @param calendar The calendar. - * @return The year. - */ - private static int getYear(Calendar calendar) { - int year = calendar.get(Calendar.YEAR); - - if (calendar.get(Calendar.ERA) == GregorianCalendar.BC) { - year = 1 - year; - } - - return year; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java index 29f9675..80e8504 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.h2.message.DbException; +import org.h2.util.JdbcUtils; import org.h2.util.Utils; import org.h2.value.CompareMode; import org.h2.value.Value; @@ -117,7 +118,7 @@ public class GridH2ValueCacheObject extends Value { } // For user-provided and array types. - return Utils.serialize(obj, null); + return JdbcUtils.serialize(obj, null); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java index 957e5f6..716c9cb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query.h2.opt; -import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; @@ -51,7 +50,7 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TermRangeFilter; import org.apache.lucene.search.TopDocs; import org.apache.lucene.util.Version; -import org.h2.util.Utils; +import org.h2.util.JdbcUtils; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.KEY_FIELD_NAME; @@ -60,7 +59,7 @@ import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.VA /** * Lucene fulltext index. */ -public class GridLuceneIndex implements Closeable { +public class GridLuceneIndex implements AutoCloseable { /** Field name for string representation of value. */ public static final String VAL_STR_FIELD_NAME = "_gg_val_str__"; @@ -363,7 +362,7 @@ public class GridLuceneIndex implements Closeable { @SuppressWarnings("unchecked") private Z unmarshall(byte[] bytes, ClassLoader ldr) throws IgniteCheckedException { if (coctx == null) // For tests. - return (Z)Utils.deserialize(bytes, null); + return (Z)JdbcUtils.deserialize(bytes, null); return (Z)coctx.processor().unmarshal(coctx, bytes, ldr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlAlias.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlAlias.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlAlias.java index 245b88e..3fb6f3c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlAlias.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlAlias.java @@ -52,6 +52,18 @@ public class GridSqlAlias extends GridSqlElement { this.alias = alias; } + /** + * @param el Element. + * @return Unwrapped from alias element. + */ + public static GridSqlElement unwrap(GridSqlElement el) { + el = el instanceof GridSqlAlias ? el.child() : el; + + assert el != null; + + return el; + } + /** {@inheritDoc} */ @Override public String getSQL() { return child().getSQL() + (useAs ? " AS " : " ") + Parser.quoteIdentifier(alias); http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlColumn.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlColumn.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlColumn.java index aca1398..6ef4446 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlColumn.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlColumn.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.sql; import java.util.Collections; +import org.h2.table.Column; /** * Column. @@ -32,12 +33,16 @@ public class GridSqlColumn extends GridSqlElement implements GridSqlValue { /** SQL from original query. May be qualified or unqualified column name. */ private final String sqlText; + /** */ + private Column col; + /** + * @param col Column. * @param from From. * @param name Name. * @param sqlText Text. */ - public GridSqlColumn(GridSqlElement from, String name, String sqlText) { + public GridSqlColumn(Column col, GridSqlElement from, String name, String sqlText) { super(Collections.emptyList()); assert sqlText != null; @@ -45,13 +50,7 @@ public class GridSqlColumn extends GridSqlElement implements GridSqlValue { expressionInFrom = from; colName = name; this.sqlText = sqlText; - } - - /** - * @return Simple unqualified column with only name. - */ - public GridSqlColumn simplify() { - return new GridSqlColumn(null, colName, colName); + this.col = col; } /** @@ -72,4 +71,11 @@ public class GridSqlColumn extends GridSqlElement implements GridSqlValue { public GridSqlElement expressionInFrom() { return expressionInFrom; } + + /** + * @return H2 Column. + */ + public Column column() { + return col; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlElement.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlElement.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlElement.java index 57d3c57..d95c14a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlElement.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlElement.java @@ -119,4 +119,15 @@ public abstract class GridSqlElement implements Iterable { @Override public String toString() { return getSQL(); } + + /** {@inheritDoc} */ + @Override public int hashCode() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return this == o || (!(o == null || getClass() != o.getClass()) && + children.equals(((GridSqlElement)o).children)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperation.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperation.java index 18d8bdf..737c5b1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperation.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperation.java @@ -63,7 +63,7 @@ public class GridSqlOperation extends GridSqlElement { /** * @return Operation type. */ - public GridSqlOperationType opType() { + public GridSqlOperationType operationType() { return opType; }