ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [01/15] ignite git commit: IGNITE-6931: SQL: simplify index rebuild. Now both CREATE INDEX and "rebuild from hash" routines use the same iteration and entry locking logic. This closes #3052.
Date Wed, 22 Nov 2017 10:23:17 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 4090eb717 -> a6b452823


IGNITE-6931: SQL: simplify index rebuild. Now both CREATE INDEX and "rebuild from hash" routines
use the same iteration and entry locking logic. This closes #3052.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/65080675
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/65080675
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/65080675

Branch: refs/heads/ignite-zk
Commit: 65080675dec8433c0968762c903905d1f65e5f29
Parents: db343b6
Author: Igor Seliverstov <gvvinblade@gmail.com>
Authored: Tue Nov 21 12:26:23 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Tue Nov 21 12:26:23 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |  13 +--
 .../processors/cache/GridCacheMapEntry.java     |  17 +--
 .../cache/IgniteCacheOffheapManager.java        |  19 ----
 .../cache/IgniteCacheOffheapManagerImpl.java    |  21 ----
 .../persistence/GridCacheOffheapManager.java    |   7 --
 .../processors/query/GridQueryProcessor.java    |  51 ++++++++-
 .../query/schema/SchemaIndexCacheFilter.java    |  33 ++++++
 .../schema/SchemaIndexCacheVisitorImpl.java     | 108 +++++++++----------
 .../processors/cache/GridCacheTestEntryEx.java  |  10 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  64 ++++-------
 10 files changed, 166 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 6da7bc7..313a0c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
 import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.jetbrains.annotations.Nullable;
@@ -869,12 +870,13 @@ public interface GridCacheEntryEx {
     /**
      * Update index from within entry lock, passing key, value, and expiration time to provided
closure.
      *
+     * @param filter Row filter.
      * @param clo Closure to apply to key, value, and expiration time.
      * @throws IgniteCheckedException If failed.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    public void updateIndex(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException,
-        GridCacheEntryRemovedException;
+    public void updateIndex(SchemaIndexCacheFilter filter, SchemaIndexCacheVisitorClosure
clo)
+        throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
      * @return Expire time, without accounting for transactions or removals.
@@ -919,13 +921,6 @@ public interface GridCacheEntryEx {
     public void updateTtl(@Nullable GridCacheVersion ver, long ttl) throws GridCacheEntryRemovedException;
 
     /**
-     * Ensures that the value stored in the entry is also inserted in the indexing.
-     *
-     * @throws GridCacheEntryRemovedException If entry was removed.
-     */
-    public void ensureIndexed() throws GridCacheEntryRemovedException, IgniteCheckedException;
-
-    /**
      * @return Value.
      * @throws IgniteCheckedException If failed to read from swap storage.
      * @throws GridCacheEntryRemovedException If entry was removed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 778a46e..79390ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
 import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridClosureException;
@@ -3141,16 +3142,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
     }
 
     /** {@inheritDoc} */
-    @Override public void ensureIndexed() throws GridCacheEntryRemovedException, IgniteCheckedException
{
-        synchronized (this) {
-            checkObsolete();
-
-            if (cctx.queries().enabled())
-                cctx.offheap().updateIndexes(cctx, key, localPartition());
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public synchronized CacheObject valueBytes() throws GridCacheEntryRemovedException
{
         checkObsolete();
 
@@ -3300,8 +3291,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
     }
 
     /** {@inheritDoc} */
-    @Override public void updateIndex(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException,
-        GridCacheEntryRemovedException {
+    @Override public void updateIndex(SchemaIndexCacheFilter filter, SchemaIndexCacheVisitorClosure
clo)
+        throws IgniteCheckedException, GridCacheEntryRemovedException {
         synchronized (this) {
             if (isInternal())
                 return;
@@ -3310,7 +3301,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
             CacheDataRow row = cctx.offheap().read(this);
 
-            if (row != null)
+            if (row != null && (filter == null || filter.apply(row)))
                 clo.apply(row);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 761b787..84c69a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -186,18 +186,6 @@ public interface IgniteCacheOffheapManager {
     /**
      * @param cctx Cache context.
      * @param key Key.
-     * @param part Partition.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void updateIndexes(
-        GridCacheContext cctx,
-        KeyCacheObject key,
-        GridDhtLocalPartition part
-    ) throws IgniteCheckedException;
-
-    /**
-     * @param cctx Cache context.
-     * @param key Key.
      * @param partId Partition number.
      * @param part Partition.
      * @throws IgniteCheckedException If failed.
@@ -454,13 +442,6 @@ public interface IgniteCacheOffheapManager {
         /**
          * @param cctx Cache context.
          * @param key Key.
-         * @throws IgniteCheckedException If failed.
-         */
-        void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
-
-        /**
-         * @param cctx Cache context.
-         * @param key Key.
          * @param c Closure.
          * @throws IgniteCheckedException If failed.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index c85ba1d..370a92e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -359,12 +359,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
-    @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition
part)
-        throws IgniteCheckedException {
-        dataStore(part).updateIndexes(cctx, key);
-    }
-
-    /** {@inheritDoc} */
     @Override public void remove(
         GridCacheContext cctx,
         KeyCacheObject key,
@@ -1362,21 +1356,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws
IgniteCheckedException {
-            int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
-
-            CacheDataRow row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
-
-            if (row != null) {
-                row.key(key);
-
-                GridCacheQueryManager qryMgr = cctx.queries();
-
-                qryMgr.store(row, null, false);
-            }
-        }
-
-        /** {@inheritDoc} */
         @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId)
throws IgniteCheckedException {
             if (!busyLock.enterBusy())
                 throw new NodeStoppingException("Operation has been cancelled (node is stopping).");

http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 6ed62f8..cfa1829 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1236,13 +1236,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl
imple
         }
 
         /** {@inheritDoc} */
-        @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws
IgniteCheckedException {
-            CacheDataStore delegate = init0(false);
-
-            delegate.updateIndexes(cctx, key);
-        }
-
-        /** {@inheritDoc} */
         @Override public CacheDataRow createRow(
             GridCacheContext cctx,
             KeyCacheObject key,

http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index ed5fdd9..471888a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
@@ -96,10 +97,12 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.util.worker.GridWorkerFuture;
@@ -1319,12 +1322,15 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
         try {
             if (op instanceof SchemaIndexCreateOperation) {
-                SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
+                final SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
 
                 QueryIndexDescriptorImpl idxDesc = QueryUtils.createIndexDescriptor(type,
op0.index());
 
-                SchemaIndexCacheVisitor visitor =
-                    new SchemaIndexCacheVisitorImpl(this, cache.context(), cacheName, op0.tableName(),
cancelTok);
+                GridCacheContext cctx = cache.context();
+
+                SchemaIndexCacheFilter filter = new TableCacheFilter(cctx, op0.tableName());
+
+                SchemaIndexCacheVisitor visitor = new SchemaIndexCacheVisitorImpl(cctx, filter,
cancelTok);
 
                 idx.dynamicIndexCreate(op0.schemaName(), op0.tableName(), idxDesc, op0.ifNotExists(),
visitor);
             }
@@ -2869,4 +2875,43 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             this.mgr = mgr;
         }
     }
+
+    /** */
+    private static class TableCacheFilter implements SchemaIndexCacheFilter {
+        /** */
+        @GridToStringExclude
+        private final GridCacheContext cctx;
+
+        /** */
+        @GridToStringExclude
+        private final GridQueryProcessor query;
+
+        /** */
+        private final String cacheName;
+
+        /** */
+        private final String tableName;
+
+        /**
+         * @param cctx Cache context.
+         * @param tableName Target table name.
+         */
+        TableCacheFilter(GridCacheContext cctx, String tableName) {
+            this.cctx = cctx;
+            this.tableName = tableName;
+
+            cacheName = cctx.name();
+            query = cctx.kernalContext().query();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(CacheDataRow row) throws IgniteCheckedException {
+            return query.belongsToTable(cctx, cacheName, tableName, row.key(), row.value());
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TableCacheFilter.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java
new file mode 100644
index 0000000..32600c6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+
+/**
+ * Index row filter accepting current entry.
+ */
+public interface SchemaIndexCacheFilter {
+    /**
+     * @param row Cache data row.
+     * @return {@code True} if row passes the filter.
+     * @throws IgniteCheckedException If failed.
+     */
+    boolean apply(CacheDataRow row) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index fda7d1d..c11c614 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.schema;
 
-import java.util.Collection;
+import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -29,7 +29,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
-import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
@@ -40,35 +39,36 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  * Traversor operating all primary and backup partitions of given cache.
  */
+@SuppressWarnings("ForLoopReplaceableByForEach")
 public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
-    /** Query procssor. */
-    private final GridQueryProcessor qryProc;
+    /** Count of rows, being processed within a single checkpoint lock. */
+    private static final int BATCH_SIZE = 1000;
 
     /** Cache context. */
     private final GridCacheContext cctx;
 
-    /** Cache name. */
-    private final String cacheName;
-
-    /** Table name. */
-    private final String tblName;
+    /** Row filter. */
+    private final SchemaIndexCacheFilter rowFilter;
 
     /** Cancellation token. */
     private final SchemaIndexOperationCancellationToken cancel;
 
     /**
      * Constructor.
-     *
-     * @param cctx Cache context.
-     * @param cacheName Cache name.
-     * @param tblName Table name.
+     *  @param cctx Cache context.
+     */
+    public SchemaIndexCacheVisitorImpl(GridCacheContext cctx) {
+        this(cctx, null, null);
+    }
+
+    /**
+     * Constructor.
+     *  @param cctx Cache context.
      * @param cancel Cancellation token.
      */
-    public SchemaIndexCacheVisitorImpl(GridQueryProcessor qryProc, GridCacheContext cctx,
String cacheName,
-        String tblName, SchemaIndexOperationCancellationToken cancel) {
-        this.qryProc = qryProc;
-        this.cacheName = cacheName;
-        this.tblName = tblName;
+    public SchemaIndexCacheVisitorImpl(GridCacheContext cctx, SchemaIndexCacheFilter rowFilter,
+        SchemaIndexOperationCancellationToken cancel) {
+        this.rowFilter = rowFilter;
         this.cancel = cancel;
 
         if (cctx.isNear())
@@ -81,12 +81,10 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor
{
     @Override public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException
{
         assert clo != null;
 
-        FilteringVisitorClosure filterClo = new FilteringVisitorClosure(clo);
-
-        Collection<GridDhtLocalPartition> parts = cctx.topology().localPartitions();
+        List<GridDhtLocalPartition> parts = cctx.topology().localPartitions();
 
-        for (GridDhtLocalPartition part : parts)
-            processPartition(part, filterClo);
+        for (int i = 0, size = parts.size(); i < size; i++)
+            processPartition(parts.get(i), clo);
     }
 
     /**
@@ -96,7 +94,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor
{
      * @param clo Index closure.
      * @throws IgniteCheckedException If failed.
      */
-    private void processPartition(GridDhtLocalPartition part, FilteringVisitorClosure clo)
+    private void processPartition(GridDhtLocalPartition part, SchemaIndexCacheVisitorClosure
clo)
         throws IgniteCheckedException {
         checkCancelled();
 
@@ -114,15 +112,35 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor
{
                 null,
                 CacheDataRowAdapter.RowData.KEY_ONLY);
 
-            while (cursor.next()) {
-                CacheDataRow row = cursor.get();
+            boolean locked = false;
+
+            try {
+                int cntr = 0;
+
+                while (cursor.next()) {
+                    KeyCacheObject key = cursor.get().key();
+
+                    if (!locked) {
+                        cctx.shared().database().checkpointReadLock();
 
-                KeyCacheObject key = row.key();
+                        locked = true;
+                    }
 
-                processKey(key, clo);
+                    processKey(key, clo);
 
-                if (part.state() == RENTING)
-                    break;
+                    if (++cntr % BATCH_SIZE == 0) {
+                        cctx.shared().database().checkpointReadUnlock();
+
+                        locked = false;
+                    }
+
+                    if (part.state() == RENTING)
+                        break;
+                }
+            }
+            finally {
+                if (locked)
+                    cctx.shared().database().checkpointReadUnlock();
             }
         }
         finally {
@@ -137,7 +155,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor
{
      * @param clo Closure.
      * @throws IgniteCheckedException If failed.
      */
-    private void processKey(KeyCacheObject key, FilteringVisitorClosure clo) throws IgniteCheckedException
{
+    private void processKey(KeyCacheObject key, SchemaIndexCacheVisitorClosure clo) throws
IgniteCheckedException {
         while (true) {
             try {
                 checkCancelled();
@@ -145,7 +163,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor
{
                 GridCacheEntryEx entry = cctx.cache().entryEx(key);
 
                 try {
-                    entry.updateIndex(clo);
+                    entry.updateIndex(rowFilter, clo);
                 }
                 finally {
                     cctx.evicts().touch(entry, AffinityTopologyVersion.NONE);
@@ -168,7 +186,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor
{
      * @throws IgniteCheckedException If cancelled.
      */
     private void checkCancelled() throws IgniteCheckedException {
-        if (cancel.isCancelled())
+        if (cancel != null && cancel.isCancelled())
             throw new IgniteCheckedException("Index creation was cancelled.");
     }
 
@@ -176,28 +194,4 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor
{
     @Override public String toString() {
         return S.toString(SchemaIndexCacheVisitorImpl.class, this);
     }
-
-    /**
-     * Filtering visitor closure.
-     */
-    private class FilteringVisitorClosure implements SchemaIndexCacheVisitorClosure {
-
-        /** Target closure. */
-        private final SchemaIndexCacheVisitorClosure target;
-
-        /**
-         * Constructor.
-         *
-         * @param target Target.
-         */
-        FilteringVisitorClosure(SchemaIndexCacheVisitorClosure target) {
-            this.target = target;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void apply(CacheDataRow row) throws IgniteCheckedException {
-            if (qryProc.belongsToTable(cctx, cacheName, tblName, row.key(), row.value()))
-                target.apply(row);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 2ba8fd8..3d7edac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple3;
@@ -817,11 +818,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements
Gr
     }
 
     /** {@inheritDoc} */
-    @Override public void ensureIndexed() throws GridCacheEntryRemovedException, IgniteCheckedException
{
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public CacheObject unswap() throws IgniteCheckedException {
         return null;
     }
@@ -842,8 +838,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements
Gr
     }
 
     /** {@inheritDoc} */
-    @Override public void updateIndex(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException,
-        GridCacheEntryRemovedException {
+    @Override public void updateIndex(SchemaIndexCacheFilter filter, SchemaIndexCacheVisitorClosure
clo)
+        throws IgniteCheckedException, GridCacheEntryRemovedException {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index c37e5f0..f3a95a5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -65,17 +65,13 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
 import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -116,6 +112,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExe
 import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.sql.SqlParseException;
 import org.apache.ignite.internal.sql.SqlParser;
@@ -2021,44 +2018,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
 
-        IgniteCacheOffheapManager offheapMgr = cctx.isNear() ? cctx.near().dht().context().offheap()
: cctx.offheap();
+        final GridCacheQueryManager qryMgr = cctx.queries();
 
-        for (int p = 0; p < cctx.affinity().partitions(); p++) {
-            try (GridCloseableIterator<KeyCacheObject> keyIter = offheapMgr.cacheKeysIterator(cctx.cacheId(),
p)) {
-                while (keyIter.hasNext()) {
-                    cctx.shared().database().checkpointReadLock();
+        SchemaIndexCacheVisitor visitor = new SchemaIndexCacheVisitorImpl(cctx);
 
-                    try {
-                        KeyCacheObject key = keyIter.next();
-
-                        while (true) {
-                            GridCacheEntryEx entry = null;
-
-                            try {
-                                entry = cctx.isNear() ?
-                                    cctx.near().dht().entryEx(key) : cctx.cache().entryEx(key);
-
-                                entry.ensureIndexed();
-
-                                break;
-                            }
-                            catch (GridCacheEntryRemovedException ignore) {
-                                // Retry.
-                            }
-                            catch (GridDhtInvalidPartitionException ignore) {
-                                break;
-                            }
-                            finally {
-                                entry.context().evicts().touch(entry, AffinityTopologyVersion.NONE);
-                            }
-                        }
-                    }
-                    finally {
-                        cctx.shared().database().checkpointReadUnlock();
-                    }
-                }
-            }
-        }
+        visitor.visit(new RebuldIndexFromHashClosure(qryMgr));
 
         for (H2TableDescriptor tblDesc : tables(cacheName))
             tblDesc.table().markRebuildFromHashInProgress(false);
@@ -2653,4 +2617,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private interface ClIter<X> extends AutoCloseable, Iterator<X> {
         // No-op.
     }
+
+    /** */
+    private static class RebuldIndexFromHashClosure implements SchemaIndexCacheVisitorClosure
{
+        /** */
+        private final GridCacheQueryManager qryMgr;
+
+        /**
+         * @param qryMgr Query manager.
+         */
+        RebuldIndexFromHashClosure(GridCacheQueryManager qryMgr) {
+            this.qryMgr = qryMgr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(CacheDataRow row) throws IgniteCheckedException {
+            qryMgr.store(row, null, false);
+        }
+    }
 }


Mime
View raw message