Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ED42C200C79 for ; Thu, 13 Apr 2017 15:30:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EBCD2160B8B; Thu, 13 Apr 2017 13:30:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1BAA5160BBD for ; Thu, 13 Apr 2017 15:30:49 +0200 (CEST) Received: (qmail 50013 invoked by uid 500); 13 Apr 2017 13:30:49 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 49478 invoked by uid 99); 13 Apr 2017 13:30:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Apr 2017 13:30:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E7AE2E9112; Thu, 13 Apr 2017 13:30:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Thu, 13 Apr 2017 13:31:06 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/21] ignite git commit: ignite-4681 Apply new future adapter archived-at: Thu, 13 Apr 2017 13:30:53 -0000 ignite-4681 Apply new future adapter Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e922dda6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e922dda6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e922dda6 Branch: refs/heads/ignite-3477-master Commit: e922dda6e9230ff7715f83c7b81e5656e8e856a0 Parents: dd4a5c4 Author: Igor Seliverstov Authored: Thu Apr 13 15:41:54 2017 +0300 Committer: Yakov Zhdanov Committed: Thu Apr 13 15:42:50 2017 +0300 ---------------------------------------------------------------------- .../jmh/future/JmhFutureAdapterBenchmark.java | 145 ++++++ .../ignite/internal/IgniteInternalFuture.java | 15 - .../cache/GridCacheCompoundFuture.java | 63 +++ .../cache/GridCacheCompoundIdentityFuture.java | 63 +++ .../processors/cache/GridCacheFuture.java | 15 + .../cache/GridCacheFutureAdapter.java | 61 +++ .../distributed/GridCacheTxRecoveryFuture.java | 9 +- .../dht/CacheDistributedGetFutureAdapter.java | 4 +- .../distributed/dht/GridDhtLockFuture.java | 33 +- .../distributed/dht/GridDhtTxFinishFuture.java | 4 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 16 +- .../dht/GridPartitionedSingleGetFuture.java | 4 +- .../GridDhtAtomicAbstractUpdateFuture.java | 4 +- .../GridNearAtomicAbstractUpdateFuture.java | 8 +- .../GridNearAtomicSingleUpdateFuture.java | 24 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 24 +- .../colocated/GridDhtColocatedLockFuture.java | 23 +- .../GridDhtPartitionsExchangeFuture.java | 35 +- .../distributed/near/GridNearLockFuture.java | 20 +- ...arOptimisticSerializableTxPrepareFuture.java | 2 +- .../near/GridNearOptimisticTxPrepareFuture.java | 9 +- .../GridNearPessimisticTxPrepareFuture.java | 5 +- .../near/GridNearTxFinishFuture.java | 13 +- .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../near/GridNearTxPrepareFutureAdapter.java | 4 +- .../cache/local/GridLocalLockFuture.java | 6 +- .../query/GridCacheDistributedQueryFuture.java | 18 +- .../query/GridCacheQueryFutureAdapter.java | 31 +- .../cache/transactions/TxDeadlockDetection.java | 5 +- .../platform/compute/PlatformCompute.java | 10 - .../tcp/GridTcpMemcachedNioListener.java | 20 +- .../util/future/GridCompoundFuture.java | 45 +- .../util/future/GridFinishedFuture.java | 13 - .../internal/util/future/GridFutureAdapter.java | 479 ++++++++++--------- .../internal/util/future/IgniteFutureImpl.java | 10 - .../org/apache/ignite/lang/IgniteFuture.java | 15 - .../GridCacheOrderedPreloadingSelfTest.java | 48 +- .../util/future/IgniteFutureImplTest.java | 38 -- .../external/HadoopExternalTaskExecutor.java | 2 +- .../processors/schedule/ScheduleFutureImpl.java | 20 - 40 files changed, 810 insertions(+), 555 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java new file mode 100644 index 0000000..ef3643a --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.future; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark; +import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.lang.IgniteInClosure; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; + +/** + * + */ +public class JmhFutureAdapterBenchmark extends JmhAbstractBenchmark { + /** */ + private static final IgniteInClosure> LSNR = new IgniteInClosure>() { + /** {@inheritDoc} */ + @Override public void apply(IgniteInternalFuture fut) { + // No-op + } + }; + + /** */ + private static final Long RES = 0L; + + /** + * + */ + @State(Scope.Thread) + public static class CompleteState { + /** */ + private final BlockingQueue> queue = new ArrayBlockingQueue<>(10); + + /** */ + private final Thread compleete = new Thread() { + + /** {@inheritDoc} */ + @Override public void run() { + while (!Thread.interrupted()) { + GridFutureAdapter fut = queue.poll(); + if (fut != null) + fut.onDone(RES); + } + } + }; + + /** + * + */ + @Setup public void setup() { + compleete.start(); + } + + /** + * @throws InterruptedException If failed. + */ + @TearDown public void destroy() throws InterruptedException { + compleete.interrupt(); + compleete.join(); + } + } + + /** + * @throws Exception If failed. + */ + @Benchmark + public void testSimpleGet() throws Exception { + GridFutureAdapter fut = new GridFutureAdapter<>(); + fut.onDone(RES); + fut.get(); + } + + /** + * @throws Exception If failed. + */ + @Benchmark + public void testSimpleGetWithListener() throws Exception { + GridFutureAdapter fut = new GridFutureAdapter<>(); + fut.listen(LSNR); + fut.onDone(RES); + fut.get(); + } + + /** + * @param state Benchmark context. + * @throws Exception If failed. + */ + @Benchmark + @Threads(4) + public void completeFutureGet(CompleteState state) throws Exception { + GridFutureAdapter fut = new GridFutureAdapter<>(); + state.queue.put(fut); + fut.get(); + } + + /** + * Run benchmarks. + * + * @param args Arguments. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + run(8); + } + + /** + * Run benchmark. + * + * @param threads Amount of threads. + * @throws Exception If failed. + */ + private static void run(int threads) throws Exception { + JmhIdeBenchmarkRunner.create() + .forks(1) + .threads(threads) + .warmupIterations(30) + .measurementIterations(30) + .benchmarks(JmhFutureAdapterBenchmark.class.getSimpleName()) + .jvmArguments("-Xms4g", "-Xmx4g") + .run(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java index 789556d..76f8c71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java @@ -103,21 +103,6 @@ public interface IgniteInternalFuture { public boolean isCancelled(); /** - * Gets start time for this future. - * - * @return Start time for this future. - */ - public long startTime(); - - /** - * Gets duration in milliseconds between start of the future and current time if future - * is not finished, or between start and finish of this future. - * - * @return Time in milliseconds this future has taken to execute. - */ - public long duration(); - - /** * Registers listener closure to be asynchronously notified whenever future completes. * * @param lsnr Listener closure to register. If not provided - this method is no-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java new file mode 100644 index 0000000..9869d4a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteReducer; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class GridCacheCompoundFuture extends GridCompoundFuture implements GridCacheFuture { + /** Future start time. */ + private final long startTime = U.currentTimeMillis(); + + /** Future end time. */ + private volatile long endTime; + + /** + * @param rdc Reducer. + */ + protected GridCacheCompoundFuture(@Nullable IgniteReducer rdc) { + super(rdc); + } + + /** {@inheritDoc} */ + @Override public long startTime() { + return startTime; + } + + /** {@inheritDoc} */ + @Override public long duration() { + long endTime = this.endTime; + + return (endTime == 0 ? U.currentTimeMillis() : endTime) - startTime; + } + + /** {@inheritDoc} */ + @Override protected boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) { + if(super.onDone(res, err, cancel)){ + endTime = U.currentTimeMillis(); + return true; + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java new file mode 100644 index 0000000..8fd619a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteReducer; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class GridCacheCompoundIdentityFuture extends GridCompoundIdentityFuture implements GridCacheFuture { + /** Future start time. */ + private final long startTime = U.currentTimeMillis(); + + /** Future end time. */ + private volatile long endTime; + + /** + * @param rdc Reducer. + */ + protected GridCacheCompoundIdentityFuture(@Nullable IgniteReducer rdc) { + super(rdc); + } + + /** {@inheritDoc} */ + @Override public long startTime() { + return startTime; + } + + /** {@inheritDoc} */ + @Override public long duration() { + long endTime = this.endTime; + + return (endTime == 0 ? U.currentTimeMillis() : endTime) - startTime; + } + + /** {@inheritDoc} */ + @Override protected boolean onDone(@Nullable T res, @Nullable Throwable err, boolean cancel) { + if(super.onDone(res, err, cancel)){ + endTime = U.currentTimeMillis(); + return true; + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java index 8bf8d40..90a219a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java @@ -26,6 +26,21 @@ import org.apache.ignite.lang.IgniteUuid; */ public interface GridCacheFuture extends IgniteInternalFuture { /** + * Gets start time for this future. + * + * @return Start time for this future. + */ + public long startTime(); + + /** + * Gets duration in milliseconds between start of the future and current time if future + * is not finished, or between start and finish of this future. + * + * @return Time in milliseconds this future has taken to execute. + */ + public long duration(); + + /** * @return Unique identifier for this future. */ public IgniteUuid futureId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java new file mode 100644 index 0000000..babd707 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class GridCacheFutureAdapter extends GridFutureAdapter implements GridCacheFuture { + /** Future start time. */ + private final long startTime = U.currentTimeMillis(); + + /** Future end time. */ + private volatile long endTime; + + /** + * Default constructor. + */ + public GridCacheFutureAdapter() { + } + + /** {@inheritDoc} */ + @Override public long startTime() { + return startTime; + } + + /** {@inheritDoc} */ + @Override public long duration() { + long endTime = this.endTime; + + return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime; + } + + /** {@inheritDoc} */ + @Override protected boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) { + if(super.onDone(res, err, cancel)){ + endTime = U.currentTimeMillis(); + return true; + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index e27f777..1c97de2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -28,11 +28,11 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.GridLeanMap; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; @@ -49,7 +49,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED; /** * Future verifying that all remote transactions related to transaction were prepared or committed. */ -public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture implements GridCacheFuture { +public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture { /** */ private static final long serialVersionUID = 0L; @@ -426,7 +426,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture { - /** */ - private static final long serialVersionUID = 0L; - /** Mini future ID. */ private final IgniteUuid futId = IgniteUuid.randomUuid(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index 4381dfd..259b096 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -25,11 +25,11 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgniteUuid; @@ -42,7 +42,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * */ -public abstract class CacheDistributedGetFutureAdapter extends GridCompoundIdentityFuture> +public abstract class CacheDistributedGetFutureAdapter extends GridCacheCompoundIdentityFuture> implements GridCacheFuture>, CacheGetFuture { /** Default max remap count value. */ public static final int DFLT_MAX_REMAP_CNT = 3; http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index a0270b0..1a7c2c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheLockCandidates; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -54,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -77,7 +77,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD; /** * Cache lock future. */ -public final class GridDhtLockFuture extends GridCompoundIdentityFuture +public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture implements GridCacheMvccFuture, GridDhtFuture, GridCacheMappedVersion { /** */ private static final long serialVersionUID = 0L; @@ -298,10 +298,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture /** * @return Entries. */ - public Collection entriesCopy() { - synchronized (sync) { - return new ArrayList<>(entries()); - } + public synchronized Collection entriesCopy() { + return new ArrayList<>(entries()); } /** @@ -395,7 +393,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture return null; } - synchronized (sync) { + synchronized (this) { entries.add(c == null || c.reentry() ? null : entry); if (c != null && !c.reentry()) @@ -529,7 +527,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture @SuppressWarnings("ForLoopReplaceableByForEach") private MiniFuture miniFuture(IgniteUuid miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (sync) { + synchronized (this) { int size = futuresCountNoLock(); // Avoid iterator creation. @@ -599,7 +597,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture * @param t Error. */ public void onError(Throwable t) { - synchronized (sync) { + synchronized (this) { if (err != null) return; @@ -646,7 +644,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]"); if (owner != null && owner.version().equals(lockVer)) { - synchronized (sync) { + synchronized (this) { if (!pendingLocks.remove(entry.key())) return false; } @@ -663,10 +661,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture /** * @return {@code True} if locks have been acquired. */ - private boolean checkLocks() { - synchronized (sync) { - return pendingLocks.isEmpty(); - } + private synchronized boolean checkLocks() { + return pendingLocks.isEmpty(); } /** {@inheritDoc} */ @@ -697,7 +693,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture if (isDone() || (err == null && success && !checkLocks())) return false; - synchronized (sync) { + synchronized (this) { if (this.err == null) this.err = err; } @@ -776,7 +772,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture * @param entries Entries. */ private void map(Iterable entries) { - synchronized (sync) { + synchronized (this) { if (mapped) return; @@ -1120,7 +1116,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture if (log.isDebugEnabled()) log.debug("Timed out waiting for lock response: " + this); - synchronized (sync) { + synchronized (GridDhtLockFuture.this) { timedOut = true; // Stop locks and responses processing. @@ -1146,9 +1142,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture */ private class MiniFuture extends GridFutureAdapter { /** */ - private static final long serialVersionUID = 0L; - - /** */ private final IgniteUuid futId = IgniteUuid.randomUuid(); /** Node. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 17e9047..23d7657 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -28,12 +28,12 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -50,7 +50,7 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTING; /** * */ -public final class GridDhtTxFinishFuture extends GridCompoundIdentityFuture +public final class GridDhtTxFinishFuture extends GridCacheCompoundIdentityFuture implements GridCacheFuture { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 93ea30d..964d423 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; import org.apache.ignite.internal.processors.cache.CacheLockCandidates; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -97,7 +98,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED; * */ @SuppressWarnings("unchecked") -public final class GridDhtTxPrepareFuture extends GridCompoundFuture +public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture implements GridCacheMvccFuture { /** */ private static final long serialVersionUID = 0L; @@ -279,7 +280,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture { /** */ - private static final long serialVersionUID = 0L; - - /** */ private final int futId; /** Node ID. */ @@ -1811,7 +1809,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture implements GridCacheFuture, +public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter implements GridCacheFuture, CacheGetFuture { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 0940acb..039cb99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -36,12 +36,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -58,7 +58,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC /** * DHT atomic cache backup update future. */ -public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapter +public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureAdapter implements GridCacheAtomicFuture { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index a2adb05..122e17c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -58,7 +59,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC /** * Base for near atomic update futures. */ -public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapter +public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFutureAdapter implements GridCacheAtomicFuture { /** Logger reference. */ private static final AtomicReference logRef = new AtomicReference<>(); @@ -114,9 +115,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt /** Near cache flag. */ protected final boolean nearEnabled; - /** Mutex to synchronize state updates. */ - protected final Object mux = new Object(); - /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ protected boolean topLocked; @@ -138,7 +136,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt /** Future ID. */ @GridToStringInclude - protected long futId; + protected volatile long futId; /** Operation result. */ protected GridCacheReturn opRes; http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index e4ba457..11c3336 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -126,9 +126,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda /** {@inheritDoc} */ @Override public long id() { - synchronized (mux) { - return futId; - } + return futId; } /** {@inheritDoc} */ @@ -141,7 +139,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda boolean rcvAll = false; - synchronized (mux) { + synchronized (this) { if (reqState == null) return false; @@ -215,7 +213,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda CachePartialUpdateCheckedException err0; AffinityTopologyVersion remapTopVer0; - synchronized (mux) { + synchronized (this) { if (futId == 0 || futId != res.futureId()) return; @@ -257,7 +255,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda GridCacheReturn opRes0 = null; CachePartialUpdateCheckedException err0 = null; - synchronized (mux) { + synchronized (this) { if (futId == 0 || futId != res.futureId()) return; @@ -331,7 +329,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @return Non-null topology version if update should be remapped. */ private AffinityTopologyVersion onAllReceived() { - assert Thread.holdsLock(mux); + assert Thread.holdsLock(this); assert futId > 0; AffinityTopologyVersion remapTopVer0 = null; @@ -488,7 +486,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda try { reqState0 = mapSingleUpdate(topVer, futId); - synchronized (mux) { + synchronized (this) { assert this.futId == 0 : this; assert this.topVer == AffinityTopologyVersion.ZERO : this; @@ -537,7 +535,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda GridNearAtomicCheckUpdateRequest checkReq = null; - synchronized (mux) { + synchronized (this) { if (this.futId == 0 || this.futId != futId) return; @@ -568,7 +566,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda private Long onFutureDone() { Long id0; - synchronized (mux) { + synchronized (this) { id0 = futId; futId = 0; @@ -734,9 +732,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } /** {@inheritDoc} */ - public String toString() { - synchronized (mux) { - return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString()); - } + public synchronized String toString() { + return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 84deefc..6198de4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -151,9 +151,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu /** {@inheritDoc} */ @Override public long id() { - synchronized (mux) { - return futId; - } + return futId; } /** {@inheritDoc} */ @@ -166,7 +164,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu List checkReqs = null; - synchronized (mux) { + synchronized (this) { if (futId == 0) return false; @@ -299,7 +297,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu CachePartialUpdateCheckedException err0; AffinityTopologyVersion remapTopVer0; - synchronized (mux) { + synchronized (this) { if (futId == 0 || futId != res.futureId()) return; @@ -372,7 +370,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu boolean rcvAll; - synchronized (mux) { + synchronized (this) { if (futId == 0 || futId != res.futureId()) return; @@ -534,7 +532,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * @return Non null topology version if update should be remapped. */ @Nullable private AffinityTopologyVersion onAllReceived() { - assert Thread.holdsLock(mux); + assert Thread.holdsLock(this); assert futId > 0; AffinityTopologyVersion remapTopVer0 = null; @@ -801,7 +799,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } } - synchronized (mux) { + synchronized (this) { assert this.futId == 0 : this; assert this.topVer == AffinityTopologyVersion.ZERO : this; @@ -866,7 +864,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu boolean rcvAll = false; - synchronized (mux) { + synchronized (this) { if (this.futId == 0 || this.futId != futId) return; @@ -938,7 +936,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu private Long onFutureDone() { Long id0; - synchronized (mux) { + synchronized (this) { id0 = futId; futId = 0; @@ -1181,9 +1179,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - public String toString() { - synchronized (mux) { - return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString()); - } + public synchronized String toString() { + return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 79c15fb..8512298 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -56,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -82,7 +82,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; /** * Colocated cache lock future. */ -public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture +public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityFuture implements GridCacheMvccFuture { /** */ private static final long serialVersionUID = 0L; @@ -203,7 +203,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture this.skipStore = skipStore; this.keepBinary = keepBinary; - ignoreInterrupts(true); + ignoreInterrupts(); threadId = tx == null ? Thread.currentThread().getId() : tx.threadId(); @@ -452,13 +452,11 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** * @return Keys for which locks requested from remote nodes but response isn't received. */ - public Set requestedKeys() { - synchronized (sync) { - if (timeoutObj != null && timeoutObj.requestedKeys != null) - return timeoutObj.requestedKeys; + public synchronized Set requestedKeys() { + if (timeoutObj != null && timeoutObj.requestedKeys != null) + return timeoutObj.requestedKeys; - return requestedKeys0(); - } + return requestedKeys0(); } /** @@ -490,7 +488,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"}) private MiniFuture miniFuture(int miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (sync) { + synchronized (this) { int size = futuresCountNoLock(); // Avoid iterator creation. @@ -1341,7 +1339,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture log.debug("Timed out waiting for lock response: " + this); if (inTx() && cctx.tm().deadlockDetectionEnabled()) { - synchronized (sync) { + synchronized (GridDhtColocatedLockFuture.this) { requestedKeys = requestedKeys0(); clear(); // Stop response processing. @@ -1390,9 +1388,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture */ private class MiniFuture extends GridFutureAdapter { /** */ - private static final long serialVersionUID = 0L; - - /** */ private final int futId; /** Node ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index f41da2b..55aca2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -171,9 +171,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter partReleaseFut; - /** */ - private final Object mux = new Object(); - /** Logger. */ private IgniteLogger log; @@ -1087,7 +1084,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter 0) + U.wait(this); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e); } } @@ -1316,7 +1311,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter nodes; - synchronized (mux) { + synchronized (this) { srvNodes.remove(cctx.localNode()); nodes = new ArrayList<>(srvNodes); @@ -1423,7 +1418,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter remaining; List srvNodes; - synchronized (mux) { + synchronized (this) { remaining = new HashSet<>(this.remaining); srvNodes = this.srvNodes != null ? new ArrayList<>(this.srvNodes) : null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 1948df0..8de01c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -54,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -80,7 +80,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; /** * Cache lock future. */ -public final class GridNearLockFuture extends GridCompoundIdentityFuture +public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture implements GridCacheMvccFuture { /** */ private static final long serialVersionUID = 0L; @@ -209,7 +209,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture requestedKeys() { - synchronized (sync) { - if (timeoutObj != null && timeoutObj.requestedKeys != null) - return timeoutObj.requestedKeys; + public synchronized Set requestedKeys() { + if (timeoutObj != null && timeoutObj.requestedKeys != null) + return timeoutObj.requestedKeys; - return requestedKeys0(); - } + return requestedKeys0(); } /** @@ -537,7 +535,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture requestedKeys() { - synchronized (sync) { + synchronized (this) { int size = futuresCountNoLock(); for (int i = 0; i < size; i++) { @@ -239,7 +239,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa @SuppressWarnings("ForLoopReplaceableByForEach") private MiniFuture miniFuture(int miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (sync) { + synchronized (this) { int size = futuresCountNoLock(); // Avoid iterator creation. @@ -694,7 +694,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (keyLockFut != null) keys = new HashSet<>(keyLockFut.lockKeys); else { - synchronized (sync) { + synchronized (this) { int size = futuresCountNoLock(); for (int i = 0; i < size; i++) { @@ -765,9 +765,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * */ private static class MiniFuture extends GridFutureAdapter { - /** */ - private static final long serialVersionUID = 0L; - /** Receive result flag updater. */ private static final AtomicIntegerFieldUpdater RCV_RES_UPD = AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes"); http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 4a443a9..cb15bca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -132,7 +132,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA @SuppressWarnings("ForLoopReplaceableByForEach") private MiniFuture miniFuture(int miniId) { // We iterate directly over the futs collection here to avoid copy. - synchronized (sync) { + synchronized (this) { int size = futuresCountNoLock(); // Avoid iterator creation. @@ -365,9 +365,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA */ private class MiniFuture extends GridFutureAdapter { /** */ - private static final long serialVersionUID = 0L; - - /** */ private final int futId; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 1b0566b..37be0fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheFuture; @@ -46,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; @@ -66,7 +66,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN; /** * */ -public final class GridNearTxFinishFuture extends GridCompoundIdentityFuture +public final class GridNearTxFinishFuture extends GridCacheCompoundIdentityFuture implements GridCacheFuture { /** */ private static final long serialVersionUID = 0L; @@ -114,7 +114,7 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu this.tx = tx; this.commit = commit; - ignoreInterrupts(true); + ignoreInterrupts(); mappings = tx.mappings(); @@ -189,7 +189,7 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu if (!isDone()) { FinishMiniFuture finishFut = null; - synchronized (sync) { + synchronized (this) { int size = futuresCountNoLock(); for (int i = 0; i < size; i++) { @@ -878,9 +878,6 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu * */ private class FinishMiniFuture extends MinFuture { - /** */ - private static final long serialVersionUID = 0L; - /** Keys. */ @GridToStringInclude private GridDistributedTxMapping m; @@ -926,7 +923,7 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu if (!F.isEmpty(backups)) { final CheckRemoteTxMiniFuture mini; - synchronized (sync) { + synchronized (GridNearTxFinishFuture.this) { int futId = Integer.MIN_VALUE + futuresCountNoLock(); mini = new CheckRemoteTxMiniFuture(futId, new HashSet<>(backups)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 8be87d4..5baec99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -3946,7 +3946,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea private IgniteInternalFuture nonInterruptable(IgniteInternalFuture fut) { // Safety. if (fut instanceof GridFutureAdapter) - ((GridFutureAdapter)fut).ignoreInterrupts(true); + ((GridFutureAdapter)fut).ignoreInterrupts(); return fut; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index f9a6353..7f1f5a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; @@ -35,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -49,7 +49,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO * Common code for tx prepare in optimistic and pessimistic modes. */ public abstract class GridNearTxPrepareFutureAdapter extends - GridCompoundFuture implements GridCacheMvccFuture { + GridCacheCompoundFuture implements GridCacheMvccFuture { /** Logger reference. */ protected static final AtomicReference logRef = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java index 8e224c8..d8e95b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -41,7 +42,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.transactions.TransactionDeadlockException; -import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -53,7 +53,7 @@ import org.jetbrains.annotations.Nullable; /** * Cache lock future. */ -public final class GridLocalLockFuture extends GridFutureAdapter +public final class GridLocalLockFuture extends GridCacheFutureAdapter implements GridCacheMvccFuture { /** */ private static final long serialVersionUID = 0L; @@ -135,7 +135,7 @@ public final class GridLocalLockFuture extends GridFutureAdapter this.filter = filter; this.tx = tx; - ignoreInterrupts(true); + ignoreInterrupts(); threadId = tx == null ? Thread.currentThread().getId() : tx.threadId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java index 6110e0c..4c8e34f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java @@ -71,7 +71,7 @@ public class GridCacheDistributedQueryFuture extends GridCacheQueryFutu assert mgr != null; - synchronized (mux) { + synchronized (this) { for (ClusterNode node : nodes) subgrid.add(node.id()); } @@ -87,7 +87,7 @@ public class GridCacheDistributedQueryFuture extends GridCacheQueryFutu Collection allNodes = cctx.discovery().allNodes(); Collection nodes; - synchronized (mux) { + synchronized (this) { nodes = F.retain(allNodes, true, new P1() { @Override public boolean apply(ClusterNode node) { @@ -139,7 +139,7 @@ public class GridCacheDistributedQueryFuture extends GridCacheQueryFutu @Override protected void onNodeLeft(UUID nodeId) { boolean callOnPage; - synchronized (mux) { + synchronized (this) { callOnPage = !loc && subgrid.contains(nodeId); } @@ -166,7 +166,7 @@ public class GridCacheDistributedQueryFuture extends GridCacheQueryFutu /** {@inheritDoc} */ @Override protected boolean onPage(UUID nodeId, boolean last) { - assert Thread.holdsLock(mux); + assert Thread.holdsLock(this); if (!loc) { rcvd.add(nodeId); @@ -192,11 +192,11 @@ public class GridCacheDistributedQueryFuture extends GridCacheQueryFutu /** {@inheritDoc} */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") @Override protected void loadPage() { - assert !Thread.holdsLock(mux); + assert !Thread.holdsLock(this); Collection nodes = null; - synchronized (mux) { + synchronized (this) { if (!isDone() && rcvd.containsAll(subgrid)) { rcvd.clear(); @@ -211,13 +211,13 @@ public class GridCacheDistributedQueryFuture extends GridCacheQueryFutu /** {@inheritDoc} */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") @Override protected void loadAllPages() throws IgniteInterruptedCheckedException { - assert !Thread.holdsLock(mux); + assert !Thread.holdsLock(this); U.await(firstPageLatch); Collection nodes = null; - synchronized (mux) { + synchronized (this) { if (!isDone() && !subgrid.isEmpty()) nodes = nodes(); } @@ -230,7 +230,7 @@ public class GridCacheDistributedQueryFuture extends GridCacheQueryFutu * @return Nodes to send requests to. */ private Collection nodes() { - assert Thread.holdsLock(mux); + assert Thread.holdsLock(this); Collection nodes = new ArrayList<>(subgrid.size());