ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [02/14] ignite git commit: fixed https://issues.apache.org/jira/browse/IGNITE-2329 + forceKeysFuture opts + disabled partition map
Date Fri, 05 Feb 2016 12:25:38 GMT
fixed https://issues.apache.org/jira/browse/IGNITE-2329 + forceKeysFuture opts + disabled partition
map


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f85d2e68
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f85d2e68
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f85d2e68

Branch: refs/heads/ignite-2329-1
Commit: f85d2e68e9ea5c8af44d44c531be016beef102e1
Parents: 914b365
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Wed Jan 27 18:42:41 2016 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Wed Jan 27 18:42:41 2016 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        |  5 ++
 .../distributed/dht/GridDhtCacheAdapter.java    |  3 +-
 .../cache/distributed/dht/GridDhtGetFuture.java | 35 ++++++---
 .../distributed/dht/GridDhtLocalPartition.java  | 76 +++++++++++---------
 .../distributed/dht/GridDhtPartitionState.java  |  2 +-
 .../dht/GridDhtPartitionTopology.java           |  5 ++
 .../dht/GridDhtPartitionTopologyImpl.java       |  9 +++
 .../dht/preloader/GridDhtPreloader.java         |  5 +-
 .../util/future/GridCompoundFuture.java         | 39 +++++-----
 .../ignite/internal/util/nio/GridNioServer.java |  6 +-
 .../util/nio/GridSelectorNioSessionImpl.java    |  2 +-
 .../IgniteTxPreloadAbstractTest.java            |  2 +-
 12 files changed, 119 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index dcfc038..ad4943e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -336,6 +336,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
+    @Override public void releasePartitions(int... parts) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public List<GridDhtLocalPartition> localPartitions() {
         return Collections.emptyList();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 9cf8084..c3146d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -695,7 +695,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         @Nullable UUID subjId,
         int taskNameHash,
         @Nullable IgniteCacheExpiryPolicy expiry,
-        boolean skipVals) {
+        boolean skipVals
+    ) {
         GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx,
             msgId,
             reader,

http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index ab166d5..fb417df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -82,7 +83,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     private Map<KeyCacheObject, Boolean> keys;
 
     /** Reserved partitions. */
-    private Collection<GridDhtLocalPartition> parts = new HashSet<>();
+    private int[] parts;
 
     /** Future ID. */
     private IgniteUuid futId;
@@ -194,8 +195,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     @Override public boolean onDone(Collection<GridCacheEntryInfo> res, Throwable err)
{
         if (super.onDone(res, err)) {
             // Release all partitions reserved by this future.
-            for (GridDhtLocalPartition part : parts)
-                part.release();
+            if (parts != null)
+                cctx.topology().releasePartitions(parts);
 
             return true;
         }
@@ -239,7 +240,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             map0(keys);
     }
 
-
     /**
      * @param keys Keys to map.
      */
@@ -251,7 +251,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             int part = cctx.affinity().partition(key.getKey());
 
             if (retries == null || !retries.contains(part)) {
-                if (!map(key.getKey(), parts)) {
+                if (!map(key.getKey())) {
                     if (retries == null)
                         retries = new HashSet<>();
 
@@ -274,15 +274,28 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         }
 
         // Add new future.
-        add(getAsync(mappedKeys == null ? keys : mappedKeys));
+        IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut = getAsync(mappedKeys
== null ? keys : mappedKeys);
+
+        // Optimization to avoid going through compound future,
+        // if getAsync() has been completed and no other futures added to this
+        // compound future.
+        if (fut.isDone() && futuresSize() == 0) {
+            if (fut.error() != null)
+                onDone(fut.error());
+            else
+                onDone(fut.result());
+
+            return;
+        }
+
+        add(fut);
     }
 
     /**
      * @param key Key.
-     * @param parts Parts to map.
      * @return {@code True} if mapped.
      */
-    private boolean map(KeyCacheObject key, Collection<GridDhtLocalPartition> parts)
{
+    private boolean map(KeyCacheObject key) {
         GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
             cache().topology().localPartition(cctx.affinity().partition(key), topVer, true)
:
             cache().topology().localPartition(key, false);
@@ -290,10 +303,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         if (part == null)
             return false;
 
-        if (!parts.contains(part)) {
+        if (parts == null || !F.contains(parts, part.id())) {
             // By reserving, we make sure that partition won't be unloaded while processed.
             if (part.reserve()) {
-                parts.add(part);
+                parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length +
1);
+
+                parts[parts.length - 1] = part.id();
 
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index c4312b5..4fc1eaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicStampedReference;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -83,8 +82,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
     /** State. */
     @GridToStringExclude
-    private final AtomicStampedReference<GridDhtPartitionState> state =
-        new AtomicStampedReference<>(MOVING, 0);
+    private final AtomicLong state = new AtomicLong((long)MOVING.ordinal() << 32);
 
     /** Rent future. */
     @GridToStringExclude
@@ -153,8 +151,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @return {@code false} If such reservation already added.
      */
     public boolean addReservation(GridDhtPartitionsReservation r) {
-        assert state.getReference() != EVICTED : "we can reserve only active partitions";
-        assert state.getStamp() != 0 : "partition must be already reserved before adding
group reservation";
+        assert GridDhtPartitionState.fromOrdinal((int)(state.get() >> 32)) != EVICTED
:
+            "we can reserve only active partitions";
+        assert (state.get() & 0xFFFF) != 0 : "partition must be already reserved before
adding group reservation";
 
         return reservations.addIfAbsent(r);
     }
@@ -185,14 +184,14 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @return Partition state.
      */
     public GridDhtPartitionState state() {
-        return state.getReference();
+        return GridDhtPartitionState.fromOrdinal((int)(state.get() >> 32));
     }
 
     /**
      * @return Reservations.
      */
     public int reservations() {
-        return state.getStamp();
+        return (int)(state.get() & 0xFFFF);
     }
 
     /**
@@ -385,14 +384,12 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      */
     @Override public boolean reserve() {
         while (true) {
-            int reservations = state.getStamp();
+            long reservations = state.get();
 
-            GridDhtPartitionState s = state.getReference();
-
-            if (s == EVICTED)
+            if ((int)(reservations >> 32) == EVICTED.ordinal())
                 return false;
 
-            if (state.compareAndSet(s, s, reservations, reservations + 1))
+            if (state.compareAndSet(reservations, reservations + 1))
                 return true;
         }
     }
@@ -402,17 +399,15 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      */
     @Override public void release() {
         while (true) {
-            int reservations = state.getStamp();
+            long reservations = state.get();
 
-            if (reservations == 0)
+            if ((int)(reservations & 0xFFFF) == 0)
                 return;
 
-            GridDhtPartitionState s = state.getReference();
-
-            assert s != EVICTED;
+            assert (int)(reservations >> 32) != EVICTED.ordinal();
 
             // Decrement reservations.
-            if (state.compareAndSet(s, s, reservations, --reservations)) {
+            if (state.compareAndSet(reservations, --reservations)) {
                 tryEvict();
 
                 break;
@@ -421,23 +416,32 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     }
 
     /**
+     * @param reservations Current aggregated value.
+     * @param toState State to switch to.
+     * @return {@code true} if cas succeeds.
+     */
+    private boolean casState(long reservations, GridDhtPartitionState toState) {
+        return state.compareAndSet(reservations, (reservations & 0xFFFF) | ((long)toState.ordinal()
<< 32));
+    }
+
+    /**
      * @return {@code True} if transitioned to OWNING state.
      */
     boolean own() {
         while (true) {
-            int reservations = state.getStamp();
+            long reservations = state.get();
 
-            GridDhtPartitionState s = state.getReference();
+            int ord = (int)(reservations >> 32);
 
-            if (s == RENTING || s == EVICTED)
+            if (ord == RENTING.ordinal() || ord == EVICTED.ordinal())
                 return false;
 
-            if (s == OWNING)
+            if (ord == OWNING.ordinal())
                 return true;
 
-            assert s == MOVING;
+            assert ord == MOVING.ordinal();
 
-            if (state.compareAndSet(MOVING, OWNING, reservations, reservations)) {
+            if (casState(reservations, OWNING)) {
                 if (log.isDebugEnabled())
                     log.debug("Owned partition: " + this);
 
@@ -455,14 +459,14 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      */
     IgniteInternalFuture<?> rent(boolean updateSeq) {
         while (true) {
-            int reservations = state.getStamp();
+            long reservations = state.get();
 
-            GridDhtPartitionState s = state.getReference();
+            int ord = (int)(reservations >> 32);
 
-            if (s == RENTING || s == EVICTED)
+            if (ord == RENTING.ordinal() || ord == EVICTED.ordinal())
                 return rent;
 
-            if (state.compareAndSet(s, RENTING, reservations, reservations)) {
+            if (casState(reservations, RENTING)) {
                 if (log.isDebugEnabled())
                     log.debug("Moved partition to RENTING state: " + this);
 
@@ -481,9 +485,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @param updateSeq Update sequence.
      */
     void tryEvictAsync(boolean updateSeq) {
+        long reservations = state.get();
+
+        int ord = (int)(reservations >> 32);
+
         if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
-            state.getReference() == RENTING && state.getStamp() == 0 &&
-            state.compareAndSet(RENTING, EVICTED, 0, 0)) {
+            ord == RENTING.ordinal() && (reservations & 0xFFFF) == 0 &&
+            casState(reservations, EVICTED)) {
             if (log.isDebugEnabled())
                 log.debug("Evicted partition: " + this);
 
@@ -520,13 +528,17 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      *
      */
     public void tryEvict() {
-        if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved())
+        long reservations = state.get();
+
+        int ord = (int)(reservations >> 32);
+
+        if (ord != RENTING.ordinal() || (reservations & 0xFFFF) != 0 || groupReserved())
             return;
 
         // Attempt to evict partition entries from cache.
         clearAll();
 
-        if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) {
+        if (map.isEmpty() && casState(reservations, EVICTED)) {
             if (log.isDebugEnabled())
                 log.debug("Evicted partition: " + this);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
index 7b49369..041f135 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
@@ -52,4 +52,4 @@ public enum GridDhtPartitionState {
     public boolean active() {
         return this != EVICTED;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index dd06d6f..84889f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -112,6 +112,11 @@ public interface GridDhtPartitionTopology {
         throws GridDhtInvalidPartitionException;
 
     /**
+     * @param parts Partitions to release (should be reserved before).
+     */
+    public void releasePartitions(int... parts);
+
+    /**
      * @param key Cache key.
      * @param create If {@code true}, then partition will be created if it's not there.
      * @return Local partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 2ab8a12..8269378 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -611,6 +611,15 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
+    @Override public void releasePartitions(int... parts) {
+        assert parts != null;
+        assert parts.length > 0;
+
+        for (int i = 0; i < parts.length; i++)
+            locParts.get(parts[i]).release();
+    }
+
+    /** {@inheritDoc} */
     @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
         return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE,
create);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9f1e25d..a72596c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -403,6 +403,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         try {
             demandLock.readLock().lock();
+
             try {
                 demander.handleSupplyMessage(idx, id, s);
             }
@@ -698,7 +699,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
      */
     @SuppressWarnings( {"unchecked", "RedundantCast"})
     @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject>
keys, AffinityTopologyVersion topVer) {
-        if (rebalanceFuture().isDone())
+        if (cctx.rebalanceEnabled() &&
+            rebalanceFuture().isDone() &&
+            Boolean.TRUE.equals(rebalanceFuture().result()))
             return null;
 
         final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx,
topVer, keys, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index c382497..9bec886 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -33,6 +33,7 @@ import org.jetbrains.annotations.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 /**
@@ -54,7 +55,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
implements Ig
         AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");
 
     /** Futures. */
-    protected final ArrayList<IgniteInternalFuture<T>> futs = new ArrayList<>();
+    protected ArrayList<IgniteInternalFuture<T>> futs;
 
     /** Reducer. */
     @GridToStringInclude
@@ -154,10 +155,8 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
implements Ig
      *
      * @return Collection of futures.
      */
-    public Collection<IgniteInternalFuture<T>> futures() {
-        synchronized (futs) {
-            return new ArrayList<>(futs);
-        }
+    public synchronized Collection<IgniteInternalFuture<T>> futures() {
+        return futs == null ? Collections.<IgniteInternalFuture<T>>emptyList()
: new ArrayList<>(futs);
     }
 
     /**
@@ -178,15 +177,16 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
implements Ig
      * @return {@code True} if there are pending futures.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    public boolean hasPending() {
-        synchronized (futs) {
-            // Avoid iterator creation and collection copy.
-            for (int i = 0; i < futs.size(); i++) {
-                IgniteInternalFuture<T> fut = futs.get(i);
-
-                if (!fut.isDone())
-                    return true;
-            }
+    public synchronized boolean hasPending() {
+        if (futs == null)
+            return false;
+
+        // Avoid iterator creation and collection copy.
+        for (int i = 0; i < futs.size(); i++) {
+            IgniteInternalFuture<T> fut = futs.get(i);
+
+            if (!fut.isDone())
+                return true;
         }
 
         return false;
@@ -200,7 +200,10 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
implements Ig
     public void add(IgniteInternalFuture<T> fut) {
         assert fut != null;
 
-        synchronized (futs) {
+        synchronized (this) {
+            if (futs == null)
+                futs = new ArrayList<>();
+
             futs.add(fut);
         }
 
@@ -258,10 +261,8 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R>
implements Ig
     /**
      * @return Futures size.
      */
-    private int futuresSize() {
-        synchronized (futs) {
-            return futs.size();
-        }
+    protected synchronized int futuresSize() {
+        return futs == null ? 0 : futs.size();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index c44e1ac..e4a5063 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -453,10 +453,8 @@ public class GridNioServer<T> {
             // Change from 0 to 1 means that worker thread should be waken up.
             clientWorkers.get(ses.selectorIndex()).offer(fut);
 
-        IgniteBiInClosure<GridNioSession, Integer> lsnr0 = msgQueueLsnr;
-
-        if (lsnr0 != null)
-            lsnr0.apply(ses, msgCnt);
+        if (msgQueueLsnr != null)
+            msgQueueLsnr.apply(ses, msgCnt);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index deb7d2b..1241f99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -309,4 +309,4 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     @Override public String toString() {
         return S.toString(GridSelectorNioSessionImpl.class, this, super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f85d2e68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
index 0a7845b..662cee3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPreloadAbstractTest.java
@@ -231,4 +231,4 @@ public abstract class IgniteTxPreloadAbstractTest extends GridCacheAbstractSelfT
 
         return cfg;
     }
-}
\ No newline at end of file
+}


Mime
View raw message