ignite-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing
Date Mon, 21 Oct 2019 15:14:54 GMT
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for
file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r337076921
 
 

 ##########
 File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
 ##########
 @@ -791,10 +951,312 @@ private int calculateBucket() {
             switch (QUEUE_TYPE) {
                 case 1:
                     return new PriorityBlockingQueue<>(
-                        1000, Comparator.comparingLong(p -> p.part.fullSize()));
+                        1000,
+                        Comparator.comparing((AbstractEvictionTask t) -> t.id.type.ordinal())
+                            .reversed()
+                            .thenComparing((AbstractEvictionTask t) -> t.part == null
? 0 : t.part.fullSize()));
                 default:
                     return new LinkedBlockingQueue<>();
             }
         }
     }
+
+    /**
+     * Context of exclusive partition purge.
+     */
+    private class ExclusivePurgeFuture extends GridFutureAdapter<Void> {
+        /** Group eviction context. */
+        GroupEvictionContext grpEvictionCtx;
+
+        /** Partition ids. */
+        Set<Integer> partIds;
+
+        /** Partitions. */
+        List<GridDhtLocalPartition> parts;
+
+        /** All partitions initialized counter. */
+        AtomicInteger initCounter;
+
+        /** */
+        AtomicInteger resultCounter;
+
+        /** Tasks future */
+        GridCompoundFuture<Void, Void> resFut = new GridCompoundFuture<>();
+
+        /** Allows to cancel index tasks. */
+        GridCompoundFuture<Void, Void> cancelFut = new GridCompoundFuture<>();
+
+        /**
+         *
+         * @param grpEvictionCtx Group eviction context.
+         * @param parts Partitions.
+         */
+        public ExclusivePurgeFuture(GroupEvictionContext grpEvictionCtx, List<GridDhtLocalPartition>
parts) {
+            this.grpEvictionCtx = grpEvictionCtx;
+            this.parts = parts;
+
+            partIds = Collections.unmodifiableSet(
+                parts.stream().mapToInt(GridDhtLocalPartition::id).boxed().collect(Collectors.toSet()));
+
+            initCounter = new AtomicInteger(parts.size());
+
+            resultCounter = new AtomicInteger(parts.size());
+        }
+
+        /**
+         * Initializes partition for exclusive purge.
+         *
+         * @param part Partition.
+         */
+        public void onInit(GridDhtLocalPartition part) {
+            if (log.isInfoEnabled())
+                log.info("Partition confirmed exclusive purge. [partId=" + part.id() + "]");
+
+            part.onClearFinished(f -> {
+                if (resultCounter.decrementAndGet() == 0) {
+                    assert resFut.isDone();
+
+                    if (resFut.error() == null)
+                        onDone();
+                    else
+                        onDone(resFut.error());
+                }
+            });
+
+            if (initCounter.decrementAndGet() == 0)
+                initTasks();
+        }
+
+        /**
+         * Initializes tasks.
+         */
+        private void initTasks() {
+            List<AbstractEvictionTask> tasks = new ArrayList<>();
+            List<AbstractEvictionTask> idxTasks = new ArrayList<>();
+
+            for (GridDhtLocalPartition p : parts) {
+                ClearOnheapEntriesTask task = new ClearOnheapEntriesTask(grpEvictionCtx,
p);
+
+                tasks.add(task);
+
+                resFut.add(task.finishFut);
+            }
+
+            CacheGroupContext grp = grpEvictionCtx.grp;
+
+            GridCompoundFuture<Void, Void> idxFut = null;
+
+            if (grp.shared().kernalContext().query().moduleEnabled()) {
+                GridQueryIndexing idx = grp.shared().kernalContext().query().getIndexing();
+
+                GridQueryRowCacheCleaner rowCacheCleaner = idx.rowCacheCleaner(grp.groupId());
+
+                if (rowCacheCleaner != null) {
+                    RowCacheCleanerTask task = new RowCacheCleanerTask(grpEvictionCtx, rowCacheCleaner,
partIds);
+
+                    tasks.add(task);
+
+                    resFut.add(task.finishFut);
+                }
+
+                List<IgniteBiTuple<Runnable, IgniteInternalFuture<Void>>>
idxPurgeList =
+                    idx.purgeIndexPartitions(grp, partIds);
+
+                assert idxPurgeList != null;
+
+                if (!idxPurgeList.isEmpty()) {
+                    idxFut = new GridCompoundFuture<>();
+
+                    for (IgniteBiTuple<Runnable, IgniteInternalFuture<Void>>
r : idxPurgeList) {
+                        AbstractEvictionTask idxTask = new IndexPurgeTask(grpEvictionCtx,
r.get1(), r.get2());
+
+                        cancelFut.add(r.get2());
+
+                        idxFut.add(idxTask.finishFut);
+
+                        idxTasks.add(idxTask);
+                    }
+
+                    cancelFut.markInitialized();
+
+                    idxFut.markInitialized();
+
+                    resFut.add(idxFut);
+                }
+            }
+
+            resFut.markInitialized();
+
+            if (log.isInfoEnabled())
+                log.info("Exclusive partition purge: scheduling tasks." +
+                    " [total=" + (tasks.size() + idxTasks.size()) + ", idx=" + idxTasks.size()
+ "]");
+
+            if (idxFut == null) {
+                for (AbstractEvictionTask r0 : tasks)
+                    evictionQueue.offer(r0);
+
+                grpEvictionCtx.totalTasks.addAndGet(tasks.size());
+            }
+            else {
+                idxFut.listen(f -> {
+                    if (f.error() == null && !grpEvictionCtx.shouldStop()) {
+                        for (AbstractEvictionTask r0 : tasks)
+                            evictionQueue.offer(r0);
+
+                        grpEvictionCtx.totalTasks.addAndGet(tasks.size());
+
+                        scheduleNextTask(0);
+                    }
+                });
+
+                for (AbstractEvictionTask r0 : idxTasks)
+                    evictionQueue.offer(r0);
+
+                grpEvictionCtx.totalTasks.addAndGet(idxTasks.size());
+            }
+
+            scheduleNextTask(0);
+        }
+
+        /**
+         * Cancels index purge tasks.
+         */
+        public void stop() {
+            if (log.isInfoEnabled())
+                log.info("Exclusive purge : stop");
+
+            resFut.onDone(new IgniteCheckedException("Cache is stopping."));
+
+            try {
+                cancelFut.cancel();
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Error cancelling index purge tasks", e);
+            }
+        }
+    }
+
+    /**
+     * Purges partitions from single index.
+     */
+    private class IndexPurgeTask extends AbstractEvictionTask {
+        /** Actual worker that removes rows from the index. */
+        Runnable runnable;
+
+        /** Future internal to the worker. */
+        IgniteInternalFuture<?> fut;
+
+        /**
+         *
+         * @param grpEvictionCtx Group eviction context.
+         * @param runnable Actual worker that removes rows from the index.
+         * @param fut Future internal to the worker.
+         */
+        IndexPurgeTask(GroupEvictionContext grpEvictionCtx, Runnable runnable, IgniteInternalFuture<?>
fut) {
+            super(null, grpEvictionCtx, TaskType.INDEX_PURGE);
+
+            this.runnable = runnable;
+
+            this.fut = fut;
+
+            fut.listen(f -> {
+                if (f.error() == null)
+                    finishFut.onDone();
+                else
+                    finishFut.onDone(f.error());
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean run0() {
+            runnable.run();
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override void scheduleRetry() {
+            // No-op.
+        }
+    }
+
+    /**
+     * Removes links belonging to certain partitions from H2RowCache.
+     */
+    private class RowCacheCleanerTask extends AbstractEvictionTask {
 
 Review comment:
   Le'ts use consistent naming and rename it to `RowCachePurgeTask` or something similar.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message