ignite-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [ignite] nizhikov commented on a change in pull request #5656: IGNITE-10663 Read Repair
Date Tue, 11 Jun 2019 15:24:02 GMT
nizhikov commented on a change in pull request #5656: IGNITE-10663 Read Repair
URL: https://github.com/apache/ignite/pull/5656#discussion_r292516033
 
 

 ##########
 File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 ##########
 @@ -4802,48 +4892,211 @@ protected V get0(
      * @return Map of cached values.
      * @throws IgniteCheckedException If read failed.
      */
-    protected Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary,
-        boolean needVer) throws IgniteCheckedException {
-        checkJta();
+    protected Map<K, V> repairableGetAll(
+        Collection<? extends K> keys,
+        boolean deserializeBinary,
+        boolean needVer,
+        boolean recovery,
+        boolean readRepair) throws IgniteCheckedException {
+        try {
+            return getAll(keys, deserializeBinary, needVer, recovery, readRepair);
+        }
+        catch (IgniteConsistencyViolationException e) {
+            repairAsync(keys, ctx.operationContextPerCall(), false).get();
 
-        String taskName = ctx.kernalContext().job().currentTaskName();
+            return repairableGetAll(keys, deserializeBinary, needVer, recovery, readRepair);
+        }
+    }
 
-        CacheOperationContext opCtx = ctx.operationContextPerCall();
+    /**
+     * @param keys Keys.
+     * @param deserializeBinary Deserialize binary flag.
+     * @param needVer Need version.
+     * @return Map of cached values.
+     * @throws IgniteCheckedException If read failed.
+     */
+    protected Map<K, V> getAll(
+        Collection<? extends K> keys,
+        boolean deserializeBinary,
+        boolean needVer,
+        boolean recovery,
+        boolean readRepair) throws IgniteCheckedException {
+        checkJta();
 
         return getAllAsync(keys,
             !ctx.config().isReadFromBackup(),
             /*skip tx*/false,
             /*subject id*/null,
-            taskName,
+            ctx.kernalContext().job().currentTaskName(),
             deserializeBinary,
-            opCtx != null && opCtx.recovery(),
+            recovery,
+            readRepair,
             /*skip vals*/false,
             needVer).get();
     }
 
     /**
      * @param keys Keys.
-     * @param deserializeBinary Deserialize binary flag.
+     * @param forcePrimary Force primary.
+     * @param skipTx Skip tx.
+     * @param subjId Subj Id.
+     * @param taskName Task name.
+     * @param deserializeBinary Deserialize binary.
+     * @param recovery Recovery mode flag.
+     * @param skipVals Skip values.
      * @param needVer Need version.
-     * @return Read future.
+     * @return Future for the get operation.
+     * @see GridCacheAdapter#getAllAsync(Collection)
      */
-    public IgniteInternalFuture<Map<K, V>> getAllAsync(
+    public IgniteInternalFuture<Map<K, V>> repairableGetAllAsync(
         @Nullable Collection<? extends K> keys,
+        boolean forcePrimary,
+        boolean skipTx,
+        @Nullable UUID subjId,
+        String taskName,
         boolean deserializeBinary,
         boolean recovery,
-        boolean needVer
+        boolean readRepair,
+        boolean skipVals,
+        final boolean needVer
     ) {
-        String taskName = ctx.kernalContext().job().currentTaskName();
-
-        return getAllAsync(keys,
-            !ctx.config().isReadFromBackup(),
-            /*skip tx*/false,
-            /*subject id*/null,
+        IgniteInternalFuture<Map<K, V>> fut = getAllAsync(
+            keys,
+            forcePrimary,
+            skipTx,
+            subjId,
             taskName,
             deserializeBinary,
             recovery,
-            /*skip vals*/false,
+            readRepair,
+            skipVals,
             needVer);
+
+        if (readRepair) {
+            CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+            return getWithRepairAsync(
+                fut,
+                () -> repairAsync(keys, opCtx, skipVals),
+                () -> repairableGetAllAsync(
+                    keys,
+                    forcePrimary,
+                    skipTx,
+                    subjId,
+                    taskName,
+                    deserializeBinary,
+                    recovery,
+                    readRepair,
+                    skipVals,
+                    needVer));
+        }
+
+        return fut;
+    }
+
+    /**
+     * Performs repair and retries get.
+     */
+    private <R> IgniteInternalFuture<R> getWithRepairAsync(
 
 Review comment:
   Please, add Javadocs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message