ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ira...@apache.org
Subject ignite git commit: IGNITE-8499 validate_indexes command doesn't detect absent rows in cache data tree
Date Wed, 16 May 2018 17:14:06 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 771e8619e -> 88a6bfdd9


IGNITE-8499 validate_indexes command doesn't detect absent rows in cache data tree


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88a6bfdd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88a6bfdd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88a6bfdd

Branch: refs/heads/master
Commit: 88a6bfdd9c7faea3d230b9959c773900b94356b1
Parents: 771e861
Author: Ivan Rakov <irakov@apache.org>
Authored: Wed May 16 20:13:30 2018 +0300
Committer: Ivan Rakov <irakov@apache.org>
Committed: Wed May 16 20:13:30 2018 +0300

----------------------------------------------------------------------
 .../internal/commandline/CommandHandler.java    |  21 +-
 .../verify/ValidateIndexesPartitionResult.java  |  31 ++-
 .../verify/VisorValidateIndexesJobResult.java   |  38 ++-
 .../visor/verify/ValidateIndexesClosure.java    | 264 ++++++++++++++-----
 .../visor/verify/VisorValidateIndexesTask.java  |   6 +-
 .../IgniteCacheWithIndexingTestSuite.java       |   3 +
 .../util/GridCommandHandlerIndexingTest.java    | 203 +++++++++++++-
 7 files changed, 477 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
index 7d457fd..04578e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
@@ -100,8 +100,8 @@ import static org.apache.ignite.internal.commandline.Command.BASELINE;
 import static org.apache.ignite.internal.commandline.Command.CACHE;
 import static org.apache.ignite.internal.commandline.Command.DEACTIVATE;
 import static org.apache.ignite.internal.commandline.Command.STATE;
-import static org.apache.ignite.internal.commandline.Command.WAL;
 import static org.apache.ignite.internal.commandline.Command.TX;
+import static org.apache.ignite.internal.commandline.Command.WAL;
 import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.ADD;
 import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.COLLECT;
 import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.REMOVE;
@@ -635,9 +635,9 @@ public class CommandHandler {
         boolean errors = false;
 
         for (Map.Entry<UUID, VisorValidateIndexesJobResult> nodeEntry : taskRes.results().entrySet())
{
-            Map<PartitionKey, ValidateIndexesPartitionResult> map = nodeEntry.getValue().response();
+            Map<PartitionKey, ValidateIndexesPartitionResult> partRes = nodeEntry.getValue().partitionResult();
 
-            for (Map.Entry<PartitionKey, ValidateIndexesPartitionResult> e : map.entrySet())
{
+            for (Map.Entry<PartitionKey, ValidateIndexesPartitionResult> e : partRes.entrySet())
{
                 ValidateIndexesPartitionResult res = e.getValue();
 
                 if (!res.issues().isEmpty()) {
@@ -649,6 +649,21 @@ public class CommandHandler {
                         log(is.toString());
                 }
             }
+
+            Map<String, ValidateIndexesPartitionResult> idxRes = nodeEntry.getValue().indexResult();
+
+            for (Map.Entry<String, ValidateIndexesPartitionResult> e : idxRes.entrySet())
{
+                ValidateIndexesPartitionResult res = e.getValue();
+
+                if (!res.issues().isEmpty()) {
+                    errors = true;
+
+                    log("SQL Index " + e.getKey() + " " + e.getValue().toString());
+
+                    for (IndexValidationIssue is : res.issues())
+                        log(is.toString());
+                }
+            }
         }
 
         if (!errors)

http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java
index 1889960..5d74a57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorDataTransferObject;
 
 /**
- *
+ * Encapsulates intermediate results of validation of SQL index (if {@link #sqlIdxName} is
present) or partition.
  */
 public class ValidateIndexesPartitionResult extends VisorDataTransferObject {
     /** */
@@ -52,6 +52,10 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject
{
     @GridToStringExclude
     private List<IndexValidationIssue> issues = new ArrayList<>(10);
 
+    /** Sql index name. */
+    @GridToStringExclude
+    private String sqlIdxName;
+
     /**
      *
      */
@@ -64,12 +68,15 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject
{
      * @param size Size.
      * @param isPrimary Is primary.
      * @param consistentId Consistent id.
+     * @param sqlIdxName Sql index name (optional).
      */
-    public ValidateIndexesPartitionResult(long updateCntr, long size, boolean isPrimary,
Object consistentId) {
+    public ValidateIndexesPartitionResult(long updateCntr, long size, boolean isPrimary,
Object consistentId,
+        String sqlIdxName) {
         this.updateCntr = updateCntr;
         this.size = size;
         this.isPrimary = isPrimary;
         this.consistentId = consistentId;
+        this.sqlIdxName = sqlIdxName;
     }
 
     /**
@@ -108,6 +115,13 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject
{
     }
 
     /**
+     * @return <code>null</code> for partition validation result, SQL index name
for index validation result
+     */
+    public String sqlIndexName() {
+        return sqlIdxName;
+    }
+
+    /**
      * @param t Issue.
      * @return True if there are already enough issues.
      */
@@ -121,12 +135,18 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject
{
     }
 
     /** {@inheritDoc} */
+    @Override public byte getProtocolVersion() {
+        return V2;
+    }
+
+    /** {@inheritDoc} */
     @Override protected void writeExternalData(ObjectOutput out) throws IOException {
         out.writeLong(updateCntr);
         out.writeLong(size);
         out.writeBoolean(isPrimary);
         out.writeObject(consistentId);
         U.writeCollection(out, issues);
+        U.writeString(out, sqlIdxName);
     }
 
     /** {@inheritDoc} */
@@ -136,10 +156,15 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject
{
         isPrimary = in.readBoolean();
         consistentId = in.readObject();
         issues = U.readList(in);
+
+        if (protoVer >= V2)
+            sqlIdxName = U.readString(in);
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(ValidateIndexesPartitionResult.class, this);
+        return sqlIdxName == null ? S.toString(ValidateIndexesPartitionResult.class, this)
:
+            ValidateIndexesPartitionResult.class.getSimpleName() + " [consistentId=" + consistentId
+
+                ", sqlIdxName=" + sqlIdxName + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
index 25c97b6..aa74323 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java
@@ -34,13 +34,19 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject
{
     private static final long serialVersionUID = 0L;
 
     /** Results of indexes validation from node. */
-    private Map<PartitionKey, ValidateIndexesPartitionResult> res;
+    private Map<PartitionKey, ValidateIndexesPartitionResult> partRes;
+
+    /** Results of reverse indexes validation from node. */
+    private Map<String, ValidateIndexesPartitionResult> idxRes;
 
     /**
-     * @param res Results of indexes validation from node.
+     * @param partRes Results of indexes validation from node.
+     * @param idxRes Results of reverse indexes validation from node.
      */
-    public VisorValidateIndexesJobResult(Map<PartitionKey, ValidateIndexesPartitionResult>
res) {
-        this.res = res;
+    public VisorValidateIndexesJobResult(Map<PartitionKey, ValidateIndexesPartitionResult>
partRes,
+        Map<String, ValidateIndexesPartitionResult> idxRes) {
+        this.partRes = partRes;
+        this.idxRes = idxRes;
     }
 
     /**
@@ -49,21 +55,37 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject
{
     public VisorValidateIndexesJobResult() {
     }
 
+    /** {@inheritDoc} */
+    @Override public byte getProtocolVersion() {
+        return V2;
+    }
+
     /**
      * @return Results of indexes validation from node.
      */
-    public Map<PartitionKey, ValidateIndexesPartitionResult> response() {
-        return res;
+    public Map<PartitionKey, ValidateIndexesPartitionResult> partitionResult() {
+        return partRes;
+    }
+
+    /**
+     * @return Results of reverse indexes validation from node.
+     */
+    public Map<String, ValidateIndexesPartitionResult> indexResult() {
+        return idxRes;
     }
 
     /** {@inheritDoc} */
     @Override protected void writeExternalData(ObjectOutput out) throws IOException {
-        U.writeMap(out, res);
+        U.writeMap(out, partRes);
+        U.writeMap(out, idxRes);
     }
 
     /** {@inheritDoc} */
     @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException,
ClassNotFoundException {
-        res = U.readMap(in);
+        partRes = U.readMap(in);
+
+        if (protoVer >= V2)
+            idxRes = U.readMap(in);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
index 373bd15..e0eff61 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
@@ -31,9 +31,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
@@ -51,12 +50,15 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.util.lang.GridIterator;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.resources.IgniteInstanceResource;
@@ -66,9 +68,13 @@ import org.h2.index.Cursor;
 import org.h2.index.Index;
 
 /**
- *
+ * Closure that locally validates indexes of given caches.
+ * Validation consists of three checks:
+ * 1. If entry is present in cache data tree, it's reachable from all cache SQL indexes
+ * 2. If entry is present in cache SQL index, it can be dereferenced with link from index
+ * 3. If entry is present in cache SQL index, it's present in cache data tree
  */
-public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, ValidateIndexesPartitionResult>>
{
+public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndexesJobResult>
{
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -84,7 +90,19 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
     private Set<String> cacheNames;
 
     /** Counter of processed partitions. */
-    private final AtomicInteger completionCntr = new AtomicInteger(0);
+    private final AtomicInteger processedPartitions = new AtomicInteger(0);
+
+    /** Total partitions. */
+    private volatile int totalPartitions;
+
+    /** Counter of processed indexes. */
+    private final AtomicInteger processedIndexes = new AtomicInteger(0);
+
+    /** Total partitions. */
+    private volatile int totalIndexes;
+
+    /** Last progress print timestamp. */
+    private final AtomicLong lastProgressPrintTs = new AtomicLong(0);
 
     /** Calculation executor. */
     private volatile ExecutorService calcExecutor;
@@ -97,7 +115,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
     }
 
     /** {@inheritDoc} */
-    @Override public Map<PartitionKey, ValidateIndexesPartitionResult> call() throws
Exception {
+    @Override public VisorValidateIndexesJobResult call() throws Exception {
         calcExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 
         try {
@@ -111,7 +129,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
     /**
      *
      */
-    private Map<PartitionKey, ValidateIndexesPartitionResult> call0() throws Exception
{
+    private VisorValidateIndexesJobResult call0() throws Exception {
         Set<Integer> grpIds = new HashSet<>();
 
         Set<String> missingCaches = new HashSet<>();
@@ -150,8 +168,9 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
         }
 
         List<Future<Map<PartitionKey, ValidateIndexesPartitionResult>>>
procPartFutures = new ArrayList<>();
-
-        completionCntr.set(0);
+        List<Future<Map<String, ValidateIndexesPartitionResult>>> procIdxFutures
= new ArrayList<>();
+        List<T2<CacheGroupContext, GridDhtLocalPartition>> partArgs = new ArrayList<>();
+        List<T2<GridCacheContext, Index>> idxArgs = new ArrayList<>();
 
         for (Integer grpId : grpIds) {
             CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
@@ -162,45 +181,82 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
             List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions();
 
             for (GridDhtLocalPartition part : parts)
-                procPartFutures.add(processPartitionAsync(grpCtx, part));
-        }
+                partArgs.add(new T2<>(grpCtx, part));
 
-        Map<PartitionKey, ValidateIndexesPartitionResult> res = new HashMap<>();
+            GridQueryProcessor qry = ignite.context().query();
 
-        long lastProgressLogTs = U.currentTimeMillis();
+            IgniteH2Indexing indexing = (IgniteH2Indexing)qry.getIndexing();
 
-        for (int i = 0; i < procPartFutures.size(); ) {
-            Future<Map<PartitionKey, ValidateIndexesPartitionResult>> fut = procPartFutures.get(i);
+            for (GridCacheContext ctx : grpCtx.caches()) {
+                Collection<GridQueryTypeDescriptor> types = qry.types(ctx.name());
 
-            try {
-                Map<PartitionKey, ValidateIndexesPartitionResult> partRes = fut.get(1,
TimeUnit.SECONDS);
+                if (!F.isEmpty(types)) {
+                    for (GridQueryTypeDescriptor type : types) {
+                        GridH2Table gridH2Tbl = indexing.dataTable(ctx.name(), type.tableName());
+
+                        if (gridH2Tbl == null)
+                            continue;
 
-                res.putAll(partRes);
+                        ArrayList<Index> indexes = gridH2Tbl.getIndexes();
 
-                i++;
+                        for (Index idx : indexes)
+                            idxArgs.add(new T2<>(ctx, idx));
+                    }
+                }
             }
-            catch (InterruptedException | ExecutionException e) {
-                for (int j = i + 1; j < procPartFutures.size(); j++)
-                    procPartFutures.get(j).cancel(false);
-
-                if (e instanceof InterruptedException)
-                    throw new IgniteInterruptedException((InterruptedException)e);
-                else if (e.getCause() instanceof IgniteException)
-                    throw (IgniteException)e.getCause();
-                else
-                    throw new IgniteException(e.getCause());
+        }
+
+        // To decrease contention on same indexes.
+        Collections.shuffle(partArgs);
+        Collections.shuffle(idxArgs);
+
+        for (T2<CacheGroupContext, GridDhtLocalPartition> t2 : partArgs)
+            procPartFutures.add(processPartitionAsync(t2.get1(), t2.get2()));
+
+        for (T2<GridCacheContext, Index> t2 : idxArgs)
+            procIdxFutures.add(processIndexAsync(t2.get1(), t2.get2()));
+
+        totalPartitions = procPartFutures.size();
+        totalIndexes = procIdxFutures.size();
+
+        Map<PartitionKey, ValidateIndexesPartitionResult> partResults = new HashMap<>();
+        Map<String, ValidateIndexesPartitionResult> idxResults = new HashMap<>();
+
+        int curPart = 0;
+        int curIdx = 0;
+        try {
+            for (; curPart < procPartFutures.size(); curPart++) {
+                Future<Map<PartitionKey, ValidateIndexesPartitionResult>> fut
= procPartFutures.get(curPart);
+
+                Map<PartitionKey, ValidateIndexesPartitionResult> partRes = fut.get();
+
+                partResults.putAll(partRes);
             }
-            catch (TimeoutException ignored) {
-                if (U.currentTimeMillis() - lastProgressLogTs > 60 * 1000L) {
-                    lastProgressLogTs = U.currentTimeMillis();
 
-                    log.warning("ValidateIndexesClosure is still running, processed " + completionCntr.get()
+ " of " +
-                        procPartFutures.size() + " local partitions");
-                }
+            for (; curIdx < procIdxFutures.size(); curIdx++) {
+                Future<Map<String, ValidateIndexesPartitionResult>> fut = procIdxFutures.get(curIdx);
+
+                Map<String, ValidateIndexesPartitionResult> idxRes = fut.get();
+
+                idxResults.putAll(idxRes);
             }
         }
+        catch (InterruptedException | ExecutionException e) {
+            for (int j = curPart; j < procPartFutures.size(); j++)
+                procPartFutures.get(j).cancel(false);
+
+            for (int j = curIdx; j < procIdxFutures.size(); j++)
+                procIdxFutures.get(j).cancel(false);
+
+            if (e instanceof InterruptedException)
+                throw new IgniteInterruptedException((InterruptedException)e);
+            else if (e.getCause() instanceof IgniteException)
+                throw (IgniteException)e.getCause();
+            else
+                throw new IgniteException(e.getCause());
+        }
 
-        return res;
+        return new VisorValidateIndexesJobResult(partResults, idxResults);
     }
 
     /**
@@ -245,12 +301,24 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
 
             boolean isPrimary = part.primary(grpCtx.topology().readyTopologyVersion());
 
-            partRes = new ValidateIndexesPartitionResult(updateCntrBefore, partSize, isPrimary,
consId);
+            partRes = new ValidateIndexesPartitionResult(updateCntrBefore, partSize, isPrimary,
consId, null);
 
             boolean enoughIssues = false;
 
-            long keysProcessed = 0;
-            long lastProgressLog = U.currentTimeMillis();
+            GridQueryProcessor qryProcessor = ignite.context().query();
+
+            Method m;
+            try {
+                m = GridQueryProcessor.class.getDeclaredMethod("typeByValue", String.class,
+                    CacheObjectContext.class, KeyCacheObject.class, CacheObject.class, boolean.class);
+            }
+            catch (NoSuchMethodException e) {
+                log.error("Failed to invoke typeByValue", e);
+
+                throw new IgniteException(e);
+            }
+
+            m.setAccessible(true);
 
             while (it.hasNextX()) {
                 if (enoughIssues)
@@ -266,14 +334,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
                 if (cacheCtx == null)
                     throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
 
-                GridQueryProcessor qryProcessor = ignite.context().query();
-
                 try {
-                    Method m = GridQueryProcessor.class.getDeclaredMethod("typeByValue",
String.class,
-                        CacheObjectContext.class, KeyCacheObject.class, CacheObject.class,
boolean.class);
-
-                    m.setAccessible(true);
-
                     QueryTypeDescriptorImpl res = (QueryTypeDescriptorImpl)m.invoke(
                         qryProcessor, cacheCtx.name(), cacheCtx.cacheObjectContext(), row.key(),
row.value(), true);
 
@@ -298,7 +359,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
                             Cursor cursor = idx.find((Session) null, h2Row, h2Row);
 
                             if (cursor == null || !cursor.next())
-                                throw new IgniteCheckedException("Key not found.");
+                                throw new IgniteCheckedException("Key is present in CacheDataTree,
but can't be found in SQL index.");
                         }
                         catch (Throwable t) {
                             Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
@@ -313,7 +374,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
                         }
                     }
                 }
-                catch (IllegalAccessException | NoSuchMethodException e) {
+                catch (IllegalAccessException e) {
                     log.error("Failed to invoke typeByValue", e);
 
                     throw new IgniteException(e);
@@ -325,16 +386,6 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
 
                     throw new IgniteException(target);
                 }
-                finally {
-                    keysProcessed++;
-
-                    if (U.currentTimeMillis() - lastProgressLog >= 60_000 && partSize
> 0) {
-                        log.warning("Processing partition " + part.id() + " (" + (keysProcessed
* 100 / partSize) +
-                            "% " + keysProcessed + "/" + partSize + ")");
-
-                        lastProgressLog = U.currentTimeMillis();
-                    }
-                }
             }
         }
         catch (IgniteCheckedException e) {
@@ -345,12 +396,107 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey,
         }
         finally {
             part.release();
+
+            printProgressIfNeeded();
         }
 
         PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName());
 
-        completionCntr.incrementAndGet();
+        processedPartitions.incrementAndGet();
 
         return Collections.singletonMap(partKey, partRes);
     }
+
+    /**
+     *
+     */
+    private void printProgressIfNeeded() {
+        long curTs = U.currentTimeMillis();
+
+        long lastTs = lastProgressPrintTs.get();
+
+        if (curTs - lastTs >= 60_000 && lastProgressPrintTs.compareAndSet(lastTs,
curTs)) {
+            log.warning("Current progress of ValidateIndexesClosure: processed " +
+                processedPartitions.get() + " of " + totalPartitions + " partitions, " +
+                processedIndexes.get() + " of " + totalIndexes + " SQL indexes");
+        }
+    }
+
+    /**
+     * @param ctx Context.
+     * @param idx Index.
+     */
+    private Future<Map<String, ValidateIndexesPartitionResult>> processIndexAsync(GridCacheContext
ctx, Index idx) {
+        return calcExecutor.submit(new Callable<Map<String, ValidateIndexesPartitionResult>>()
{
+            @Override public Map<String, ValidateIndexesPartitionResult> call() throws
Exception {
+                return processIndex(ctx, idx);
+            }
+        });
+    }
+
+    /**
+     * @param ctx Context.
+     * @param idx Index.
+     */
+    private Map<String, ValidateIndexesPartitionResult> processIndex(GridCacheContext
ctx, Index idx) {
+        Object consId = ignite.context().discovery().localNode().consistentId();
+
+        ValidateIndexesPartitionResult idxValidationRes = new ValidateIndexesPartitionResult(
+            -1, -1, true, consId, idx.getName());
+
+        boolean enoughIssues = false;
+
+        Cursor cursor = null;
+
+        try {
+            cursor = idx.find((Session)null, null, null);
+
+            if (cursor == null)
+                throw new IgniteCheckedException("Can't iterate through index: " + idx);
+        }
+        catch (Throwable t) {
+            IndexValidationIssue is = new IndexValidationIssue(null, ctx.name(), idx.getName(),
t);
+
+            log.error("Find in index failed: " + is.toString());
+
+            enoughIssues = true;
+        }
+
+        while (!enoughIssues) {
+            KeyCacheObject h2key = null;
+
+            try {
+                if (!cursor.next())
+                    break;
+
+                GridH2Row h2Row = (GridH2Row)cursor.get();
+
+                h2key = h2Row.key();
+
+                CacheDataRow cacheDataStoreRow = ctx.group().offheap().read(ctx, h2key);
+
+                if (cacheDataStoreRow == null)
+                    throw new IgniteCheckedException("Key is present in SQL index, but can't
be found in CacheDataTree.");
+            }
+            catch (Throwable t) {
+                Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
+                    ctx.cacheObjectContext(), h2key, true, true);
+
+                IndexValidationIssue is = new IndexValidationIssue(
+                    String.valueOf(o), ctx.name(), idx.getName(), t);
+
+                log.error("Failed to lookup key: " + is.toString());
+
+                enoughIssues |= idxValidationRes.reportIssue(is);
+            }
+        }
+
+        String uniqueIdxName = "[cache=" + ctx.name() + ", idx=" + idx.getName() + "]";
+
+        processedIndexes.incrementAndGet();
+
+        printProgressIfNeeded();
+
+        return Collections.singletonMap(uniqueIdxName, idxValidationRes);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
index 1a89c2c..52b48a5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.visor.VisorJob;
@@ -81,10 +80,7 @@ public class VisorValidateIndexesTask extends VisorMultiNodeTask<VisorValidateIn
 
                 ignite.context().resource().injectGeneric(clo);
 
-                Map<PartitionKey, ValidateIndexesPartitionResult> res = clo.call();
-
-                return new VisorValidateIndexesJobResult(res);
-
+                return clo.call();
             }
             catch (Exception e) {
                 throw new IgniteException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index bc99981..c896736 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.ttl.CacheTtlTransactionalLoca
 import org.apache.ignite.internal.processors.cache.ttl.CacheTtlTransactionalPartitionedSelfTest;
 import org.apache.ignite.internal.processors.client.IgniteDataStreamerTest;
 import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelperTest;
+import org.apache.ignite.util.GridCommandHandlerIndexingTest;
 
 /**
  * Cache tests using indexing.
@@ -81,6 +82,8 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteDataStreamerTest.class);
 
+        suite.addTestSuite(GridCommandHandlerIndexingTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
index 9e9c777..62d3fc0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java
@@ -19,17 +19,32 @@ package org.apache.ignite.util;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.tree.SearchRow;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
 
@@ -38,36 +53,202 @@ import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK
  */
 public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest {
     /**
-     *
+     * Tests that validation doesn't fail if nothing is broken.
      */
-    public void testValidateIndexes() throws Exception {
+    public void testValidateIndexesNoErrors() throws Exception {
         Ignite ignite = startGrids(2);
 
         ignite.cluster().active(true);
 
         Ignite client = startGrid("client");
 
-        IgniteCache<Integer, Person> personCache = client.getOrCreateCache(new CacheConfiguration<Integer,
Person>()
-            .setName("persons-cache-vi")
-            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
-            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
-            .setBackups(1)
-            .setQueryEntities(F.asList(personEntity(true, true)))
-            .setAffinity(new RendezvousAffinityFunction(false, 32)));
+        String cacheName = "persons-cache-vi";
+
+        IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
 
         ThreadLocalRandom rand = ThreadLocalRandom.current();
 
-        for (int i = 0; i < 1000; i++)
+        for (int i = 0; i < 10_000; i++)
             personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
 
         injectTestSystemOut();
 
-        assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", "persons-cache-vi"));
+        assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName));
 
         assertTrue(testOut.toString().contains("validate_indexes has finished, no issues
found"));
     }
 
     /**
+     * Tests that missing rows in CacheDataTree are detected.
+     */
+    public void testBrokenCacheDataTreeShouldFailValidation() throws Exception {
+        Ignite ignite = startGrids(2);
+
+        ignite.cluster().active(true);
+
+        Ignite client = startGrid("client");
+
+        String cacheName = "persons-cache-vi";
+
+        IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
+
+        ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 10_000; i++)
+            personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
+
+        breakCacheDataTree(ignite, cacheName, 1);
+
+        injectTestSystemOut();
+
+        assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName));
+
+        assertTrue(testOut.toString().contains("validate_indexes has finished with errors"));
+    }
+
+    /**
+     * Tests that missing rows in H2 indexes are detected.
+     */
+    public void testBrokenSqlIndexShouldFailValidation() throws Exception {
+        Ignite ignite = startGrids(2);
+
+        ignite.cluster().active(true);
+
+        Ignite client = startGrid("client");
+
+        String cacheName = "persons-cache-vi";
+
+        IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName);
+
+        ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 10_000; i++)
+            personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong())));
+
+        breakSqlIndex(ignite, cacheName);
+
+        injectTestSystemOut();
+
+        assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName));
+
+        assertTrue(testOut.toString().contains("validate_indexes has finished with errors"));
+    }
+
+    /**
+     * Removes some entries from a partition skipping index update. This effectively breaks
the index.
+     */
+    private void breakCacheDataTree(Ignite ig, String cacheName, int partId) {
+        IgniteEx ig0 = (IgniteEx)ig;
+        int cacheId = CU.cacheId(cacheName);
+
+        ScanQuery scanQry = new ScanQuery(partId);
+
+        GridCacheContext<Object, Object> ctx = ig0.context().cache().context().cacheContext(cacheId);
+
+        // Get current update counter
+        String grpName = ig0.context().cache().context().cacheContext(cacheId).config().getGroupName();
+        int cacheGrpId = grpName == null ? cacheName.hashCode() : grpName.hashCode();
+
+        GridDhtLocalPartition locPart = ctx.dht().topology().localPartition(partId);
+        IgniteCacheOffheapManager.CacheDataStore dataStore = ig0.context().cache().context().cache().cacheGroup(cacheGrpId).offheap().dataStore(locPart);
+
+        Iterator<Cache.Entry> it = ig.cache(cacheName).withKeepBinary().query(scanQry).iterator();
+
+        for (int i = 0; i < 5_000; i++) {
+            if (it.hasNext()) {
+                Cache.Entry entry = it.next();
+
+                if (i % 5 == 0) {
+                    // Do update
+                    GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database();
+
+                    db.checkpointReadLock();
+
+                    try {
+                        IgniteCacheOffheapManager.CacheDataStore innerStore = U.field(dataStore,
"delegate");
+
+                        // IgniteCacheOffheapManagerImpl.CacheDataRowStore
+                        Object rowStore = U.field(innerStore, "rowStore");
+
+                        // IgniteCacheOffheapManagerImpl.CacheDataTree
+                        Object dataTree = U.field(innerStore, "dataTree");
+
+                        CacheDataRow oldRow = U.invoke(
+                            dataTree.getClass(),
+                            dataTree,
+                            "remove",
+                            new SearchRow(cacheId, ctx.toCacheKeyObject(entry.getKey())));
+
+                        if (oldRow != null)
+                            U.invoke(rowStore.getClass(), rowStore, "removeRow", oldRow.link());
+                    }
+                    catch (IgniteCheckedException e) {
+                        System.out.println("Failed to remove key skipping indexes: " + entry);
+
+                        e.printStackTrace();
+                    }
+                    finally {
+                        db.checkpointReadUnlock();
+                    }
+                }
+            }
+            else {
+                System.out.println("Early exit for index corruption, keys processed: " +
i);
+
+                break;
+            }
+        }
+    }
+
+    /**
+     * Removes some entries from H2 trees skipping partition updates. This effectively breaks
the index.
+     */
+    private void breakSqlIndex(Ignite ig, String cacheName) throws Exception {
+        GridQueryProcessor qry = ((IgniteEx)ig).context().query();
+
+        GridCacheContext<Object, Object> ctx = ((IgniteEx)ig).cachex(cacheName).context();
+
+        GridDhtLocalPartition locPart = ctx.topology().localPartitions().get(0);
+
+        GridIterator<CacheDataRow> it = ctx.group().offheap().partitionIterator(locPart.id());
+
+        for (int i = 0; i < 500; i++) {
+            if (!it.hasNextX()) {
+                System.out.println("Early exit for index corruption, keys processed: " +
i);
+
+                break;
+            }
+
+            CacheDataRow row = it.nextX();
+
+            ctx.shared().database().checkpointReadLock();
+
+            try {
+                qry.remove(ctx, row);
+            }
+            finally {
+                ctx.shared().database().checkpointReadUnlock();
+            }
+        }
+    }
+
+    /**
+     * Dynamically creates cache with SQL indexes.
+     *
+     * @param ig Client.
+     * @param cacheName Cache name.
+     */
+    private IgniteCache<Integer, Person> createPersonCache(Ignite ig, String cacheName)
{
+        return ig.getOrCreateCache(new CacheConfiguration<Integer, Person>()
+            .setName(cacheName)
+            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setBackups(1)
+            .setQueryEntities(F.asList(personEntity(true, true)))
+            .setAffinity(new RendezvousAffinityFunction(false, 32)));
+    }
+
+    /**
      * @param idxName Index name.
      * @param idxOrgId Index org id.
      */


Mime
View raw message