Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5795317A03 for ; Thu, 18 Jun 2015 19:49:40 +0000 (UTC) Received: (qmail 42150 invoked by uid 500); 18 Jun 2015 19:49:35 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 42119 invoked by uid 500); 18 Jun 2015 19:49:35 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 42110 invoked by uid 99); 18 Jun 2015 19:49:35 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jun 2015 19:49:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id C5F3DCEC9F for ; Thu, 18 Jun 2015 19:49:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.512 X-Spam-Level: * X-Spam-Status: No, score=1.512 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.269, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id LNSGZPtALWTa for ; Thu, 18 Jun 2015 19:49:28 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 8F814455D8 for ; Thu, 18 Jun 2015 19:49:27 +0000 (UTC) Received: (qmail 34717 invoked by uid 99); 18 Jun 2015 19:48:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jun 2015 19:48:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A3DD5E3C9A; Thu, 18 Jun 2015 19:48:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 18 Jun 2015 19:48:19 -0000 Message-Id: <2ef5f21e289b4742863a9c1bed5736eb@git.apache.org> In-Reply-To: <411c4853d96a433eaaf919fef45d8aae@git.apache.org> References: <411c4853d96a433eaaf919fef45d8aae@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/50] incubator-ignite git commit: ignite-484-1 - group partition reservation ignite-484-1 - group partition reservation Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1efefbd9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1efefbd9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1efefbd9 Branch: refs/heads/ignite-980 Commit: 1efefbd9497fdf31c36ff27634a5e13d5c74ea9a Parents: ef50a38 Author: S.Vladykin Authored: Thu Jun 11 04:02:29 2015 +0300 Committer: S.Vladykin Committed: Thu Jun 11 04:02:29 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLocalPartition.java | 58 +++++- .../dht/GridDhtPartitionsReservation.java | 181 +++++++++++++++++++ .../cache/distributed/dht/GridReservable.java | 35 ++++ .../query/h2/twostep/GridMapQueryExecutor.java | 144 +++++++++++---- 4 files changed, 374 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index dc4982e..e858e42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * Key partition. */ -public class GridDhtLocalPartition implements Comparable { +public class GridDhtLocalPartition implements Comparable, GridReservable { /** Maximum size for delete queue. */ public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000); @@ -63,7 +63,7 @@ public class GridDhtLocalPartition implements Comparable /** State. */ @GridToStringExclude - private AtomicStampedReference state = + private final AtomicStampedReference state = new AtomicStampedReference<>(MOVING, 0); /** Rent future. */ @@ -94,7 +94,10 @@ public class GridDhtLocalPartition implements Comparable private final LongAdder8 mapPubSize = new LongAdder8(); /** Remove queue. */ - private GridCircularBuffer> rmvQueue; + private final GridCircularBuffer> rmvQueue; + + /** Group reservations. */ + private final CopyOnWriteArrayList reservations = new CopyOnWriteArrayList<>(); /** * @param cctx Context. @@ -131,6 +134,31 @@ public class GridDhtLocalPartition implements Comparable } /** + * Adds group reservation to this partition. + * + * @param r Reservation. + * @return {@code true} If reservation added successfully. + */ + public boolean addReservation(GridDhtPartitionsReservation r) { + assert state.getReference() != EVICTED : "we can reserve only active partitions"; + assert state.getStamp() != 0 : "partition must be already reserved before adding group reservation"; + + if (!reservations.addIfAbsent(r)) + return false; + + r.register(this); + + return true; + } + + /** + * @param r Reservation. + */ + public void removeReservation(GridDhtPartitionsReservation r) { + reservations.remove(r); + } + + /** * @return Partition ID. */ public int id() { @@ -334,7 +362,7 @@ public class GridDhtLocalPartition implements Comparable * * @return {@code True} if reserved. */ - public boolean reserve() { + @Override public boolean reserve() { while (true) { int reservations = state.getStamp(); @@ -351,7 +379,7 @@ public class GridDhtLocalPartition implements Comparable /** * Releases previously reserved partition. */ - public void release() { + @Override public void release() { while (true) { int reservations = state.getStamp(); @@ -441,7 +469,7 @@ public class GridDhtLocalPartition implements Comparable * @param updateSeq Update sequence. * @return Future for evict attempt. */ - private IgniteInternalFuture tryEvictAsync(boolean updateSeq) { + IgniteInternalFuture tryEvictAsync(boolean updateSeq) { if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) && state.compareAndSet(RENTING, EVICTED, 0, 0)) { if (log.isDebugEnabled()) @@ -471,12 +499,26 @@ public class GridDhtLocalPartition implements Comparable } /** + * @return {@code true} If there is a group reservation. + */ + private boolean groupReserved() { + boolean reserved = false; + + for (GridDhtPartitionsReservation reservation : reservations) { + if (!reservation.canEvict()) + reserved = true; + } + + return reserved; + } + + /** * @param updateSeq Update sequence. * @return {@code True} if entry has been transitioned to state EVICTED. */ - private boolean tryEvict(boolean updateSeq) { + boolean tryEvict(boolean updateSeq) { // Attempt to evict partition entries from cache. - if (state.getReference() == RENTING && state.getStamp() == 0) + if (state.getReference() == RENTING && state.getStamp() == 0 && !groupReserved()) clearAll(); if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java new file mode 100644 index 0000000..fcd6088 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + +/** + * Reservation mechanism for multiple partitions allowing to do a reservation in one operation. + */ +public class GridDhtPartitionsReservation implements GridReservable { + /** */ + private final GridCacheContext cctx; + + /** */ + private final AffinityTopologyVersion topVer; + + /** */ + private final List parts = new ArrayList<>(); + + /** */ + private final AtomicInteger reservations = new AtomicInteger(); + + /** */ + private final IgniteInClosure finalize; + + /** + * @param topVer AffinityTopologyVersion version. + * @param cctx Cache context. + * @param finalize Finalizing closure. + */ + public GridDhtPartitionsReservation( + AffinityTopologyVersion topVer, + GridCacheContext cctx, + IgniteInClosure finalize) { + assert topVer != null; + assert cctx != null; + + this.topVer = topVer; + this.cctx = cctx; + this.finalize = finalize; + } + + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return Cache context. + */ + public GridCacheContext cacheContext() { + return cctx; + } + + /** + * Registers partition for this group reservation. + * + * @param part Partition. + */ + public void register(GridDhtLocalPartition part) { + parts.add(part); + } + + /** + * Reserves all the registered partitions. + * + * @return {@code true} If succeeded. + */ + @Override public boolean reserve() { + for (;;) { + int r = reservations.get(); + + if (r == -1) // Invalidated by successful canEvict call. + return false; + + assert r >= 0 : r; + + if (reservations.compareAndSet(r, r + 1)) + return true; + } + } + + /** + * Releases all the registered partitions. + */ + @Override public void release() { + for (;;) { + int r = reservations.get(); + + if (r <= 0) + throw new IllegalStateException("Method 'reserve' must be called before 'release'."); + + if (reservations.compareAndSet(r, r - 1)) { + // If it was the last reservation and topology version changed -> attempt to evict partitions. + if (r == 1 && !topVer.equals(cctx.topology().topologyVersion())) { + for (GridDhtLocalPartition part : parts) { + if (part.state() == RENTING) + part.tryEvictAsync(true); + } + } + + return; + } + } + } + + /** + * Must be checked in {@link GridDhtLocalPartition#tryEvict(boolean)}. + * If returns {@code true} then probably partition will be evicted (or at least cleared), + * so this reservation object becomes invalid and must be dropped from the partition. + * Also this means that after returning {@code true} here method {@link #reserve()} can not + * return {@code true} anymore. + * + * @return {@code true} If this reservation is NOT reserved and partition CAN be evicted. + */ + public boolean canEvict() { + int r = reservations.get(); + + assert r >= -1 : r; + + if (r != 0) + return r == -1; + + if (reservations.compareAndSet(0, -1)) { + // Remove our self. + for (GridDhtLocalPartition part : parts) + part.removeReservation(this); + + if (finalize != null) + finalize.apply(this); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + GridDhtPartitionsReservation that = (GridDhtPartitionsReservation)o; + + return topVer.equals(that.topVer) && cctx == that.cctx; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + String cache = cctx.name(); + + return 31 * topVer.hashCode() + cache == null ? 0 : cache.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java new file mode 100644 index 0000000..326b077 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +/** + * Reservations support. + */ +public interface GridReservable { + /** + * Reserves. + * + * @return {@code true} If reserved successfully. + */ + public boolean reserve(); + + /** + * Releases. + */ + public void release(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1efefbd9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index d9e9066..a8bc6e0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -87,6 +87,10 @@ public class GridMapQueryExecutor { /** */ private final GridSpinBusyLock busyLock; + /** */ + private final ConcurrentMap, GridReservable> reservations = + new ConcurrentHashMap8<>(); + /** * @param busyLock Busy lock. */ @@ -202,15 +206,13 @@ public class GridMapQueryExecutor { /** * @param cacheName Cache name. - * @param topVer Topology version. * @return Cache context or {@code null} if none. */ - @Nullable private GridCacheContext cacheContext(String cacheName, AffinityTopologyVersion topVer) { + @Nullable private GridCacheContext cacheContext(String cacheName) { GridCacheAdapter cache = ctx.cache().internalCache(cacheName); - if (cache == null) // Since we've waited for for cache affinity updates, this must be a misconfiguration. - throw new CacheException("Cache does not exist on current node: [nodeId=" + ctx.localNodeId() + - ", cache=" + cacheName + ", topVer=" + topVer + "]"); + if (cache == null) + return null; return cache.context(); } @@ -218,17 +220,23 @@ public class GridMapQueryExecutor { /** * @param cacheNames Cache names. * @param topVer Topology version. - * @param parts Explicit partitions. + * @param explicitParts Explicit partitions list. * @param reserved Reserved list. * @return {@code true} If all the needed partitions successfully reserved. * @throws IgniteCheckedException If failed. */ - private boolean reservePartitions(Collection cacheNames, AffinityTopologyVersion topVer, final int[] parts, - List reserved) throws IgniteCheckedException { - Collection partIds = parts == null ? null : wrap(parts); + private boolean reservePartitions( + Collection cacheNames, + AffinityTopologyVersion topVer, + final int[] explicitParts, + List reserved + ) throws IgniteCheckedException { + assert topVer != null; + + Collection partIds = wrap(explicitParts); for (String cacheName : cacheNames) { - GridCacheContext cctx = cacheContext(cacheName, topVer); + GridCacheContext cctx = cacheContext(cacheName); if (cctx == null) // Cache was not found, probably was not deployed yet. return false; @@ -236,35 +244,75 @@ public class GridMapQueryExecutor { if (cctx.isLocal()) continue; - int partsCnt = cctx.affinity().partitions(); + final T2 grpKey = new T2<>(cctx.name(), topVer); - if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache. - for (int p = 0; p < partsCnt; p++) { - GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false); + GridReservable r = reservations.get(grpKey); - if (part == null || part.state() != OWNING) - return false; + if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. + if (!r.reserve()) + return false; // We need explicit partitions here -> retry. - // We don't need to reserve partitions because they will not be evicted in replicated caches. - } + reserved.add(r); } - else { // Reserve primary partitions for partitioned cache. - if (parts == null) - partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); + else { // Try to reserve partitions one by one. + int partsCnt = cctx.affinity().partitions(); - for (int partId : partIds) { - if (partId >= partsCnt) - break; // We can have more partitions because `parts` array is shared for all caches. + if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache. + if (r == null) { // Check only once. + for (int p = 0; p < partsCnt; p++) { + GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false); - GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false); + // We don't need to reserve partitions because they will not be evicted in replicated caches. + if (part == null || part.state() != OWNING) + return false; - if (part == null || part.state() != OWNING || !part.reserve()) - return false; + // Mark that we checked this replicated cache. + reservations.putIfAbsent(grpKey, ReplicatedReservation.INSTANCE); + } + } + } + else { // Reserve primary partitions for partitioned cache (if no explicit given). + if (explicitParts == null) + partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); - reserved.add(part); + for (int partId : partIds) { + GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false); - if (part.state() != OWNING) - return false; + if (part == null || part.state() != OWNING || !part.reserve()) + return false; + + reserved.add(part); + + // Double check that we are still in owning state and partition contents are not cleared. + if (part.state() != OWNING) + return false; + } + + if (explicitParts == null) { + // We reserved all the primary partitions for cache, attempt to add group reservation. + GridDhtPartitionsReservation reservation = new GridDhtPartitionsReservation(topVer, cctx, + new CI1() { + @Override public void apply(GridDhtPartitionsReservation r) { + reservations.remove(grpKey, r); + } + }); + + for (int p = reserved.size() - partIds.size(); p < reserved.size(); p++) { + if (!((GridDhtLocalPartition)reserved.get(p)).addReservation(reservation)) { + // Can fail to add only on the first partition because of the same order of partitions. + assert p == reserved.size() - partIds.size() : p; + + reservation = null; + + break; + } + } + + if (reservation != null) { // If we were able to add reservation to all partitions, publish it. + if (reservations.putIfAbsent(grpKey, reservation) != null) + throw new IllegalStateException(); + } + } } } } @@ -277,7 +325,10 @@ public class GridMapQueryExecutor { * @return Collection wrapper. */ private static Collection wrap(final int[] ints) { - if (F.isEmpty(ints)) + if (ints == null) + return null; + + if (ints.length == 0) return Collections.emptySet(); return new AbstractCollection() { @@ -317,7 +368,7 @@ public class GridMapQueryExecutor { QueryResults qr = null; - List reserved = new ArrayList<>(); + List reserved = new ArrayList<>(); try { // Unmarshall query params. @@ -343,7 +394,7 @@ public class GridMapQueryExecutor { final AffinityTopologyVersion topVer = req.topologyVersion(); if (topVer != null) { - // Reserve primary partitions. + // Reserve primary for topology version or explicit partitions. if (!reservePartitions(caches, topVer, req.partitions(), reserved)) { sendRetry(node, req.requestId()); @@ -352,7 +403,10 @@ public class GridMapQueryExecutor { } // Prepare to run queries. - GridCacheContext mainCctx = cacheContext(req.space(), topVer); + GridCacheContext mainCctx = cacheContext(req.space()); + + if (mainCctx == null) + throw new CacheException("Cache was destroyed: " + req.space()); qr = new QueryResults(req.requestId(), qrys.size(), mainCctx); @@ -420,8 +474,8 @@ public class GridMapQueryExecutor { h2.setFilters(null); // Release reserved partitions. - for (GridDhtLocalPartition part : reserved) - part.release(); + for (GridReservable r : reserved) + r.release(); } } @@ -738,4 +792,22 @@ public class GridMapQueryExecutor { U.close(stmt, log); } } + + /** + * Fake reservation object for replicated caches. + */ + private static class ReplicatedReservation implements GridReservable { + /** */ + static final ReplicatedReservation INSTANCE = new ReplicatedReservation(); + + /** {@inheritDoc} */ + @Override public boolean reserve() { + return true; + } + + /** {@inheritDoc} */ + @Override public void release() { + // No-op. + } + } }