ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [1/3] ignite git commit: WIP.
Date Mon, 21 Mar 2016 11:31:21 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-put-experimental [created] 207a4c917


WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee363b9f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee363b9f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee363b9f

Branch: refs/heads/ignite-put-experimental
Commit: ee363b9fc0f5035d3f976a25e58426979578e6d8
Parents: 865e376
Author: thatcoach <ppozerov@list.ru>
Authored: Sat Mar 19 23:50:53 2016 +0300
Committer: thatcoach <ppozerov@list.ru>
Committed: Sat Mar 19 23:50:53 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 55 ++++++++--------
 .../deployment/GridDeploymentLocalStore.java    | 39 ++++++------
 .../processors/cache/GridCacheMvccManager.java  | 40 ++++++------
 .../processors/cache/GridCacheUtils.java        | 56 ++++++++--------
 .../dht/atomic/GridDhtAtomicCache.java          | 43 +++++++------
 .../ignite/internal/util/lang/GridFunc.java     | 52 +++++++--------
 .../ignite/internal/util/nio/GridNioServer.java | 59 ++++++++---------
 .../util/nio/GridSelectorNioSessionImpl.java    | 15 +++--
 .../communication/tcp/TcpCommunicationSpi.java  | 67 ++++++++++----------
 9 files changed, 216 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9ffbf4e..c4ca984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -17,27 +17,6 @@
 
 package org.apache.ignite.internal.managers.communication;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -81,7 +60,27 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -124,7 +123,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private final Object sysLsnrsMux = new Object();
 
     /** Disconnect listeners. */
-    private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>();
+    private final Collection<GridDisconnectListener> disconnectLsnrs = new LinkedBlockingQueue<>();
 
     /** Map of {@link IoPool}-s injected by Ignite plugins. */
     private final IoPool[] ioPools = new IoPool[128];
@@ -164,7 +163,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private final long discoDelay;
 
     /** Cache for messages that were received prior to discovery. */
-    private final ConcurrentMap<UUID, ConcurrentLinkedDeque8<DelayedMessage>>
waitMap =
+    private final ConcurrentMap<UUID, LinkedBlockingDeque<DelayedMessage>> waitMap
=
         new ConcurrentHashMap8<>();
 
     /** Communication message listener. */
@@ -418,7 +417,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                         lock.writeLock().lock();
 
                         try {
-                            ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(nodeId);
+                            LinkedBlockingDeque<DelayedMessage> waitList = waitMap.remove(nodeId);
 
                             if (log.isDebugEnabled())
                                 log.debug("Removed messages from discovery startup delay
list " +
@@ -448,9 +447,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         try {
             started = true;
 
-            for (Entry<UUID, ConcurrentLinkedDeque8<DelayedMessage>> e : waitMap.entrySet())
{
+            for (Entry<UUID, LinkedBlockingDeque<DelayedMessage>> e : waitMap.entrySet())
{
                 if (ctx.discovery().node(e.getKey()) != null) {
-                    ConcurrentLinkedDeque8<DelayedMessage> waitList = waitMap.remove(e.getKey());
+                    LinkedBlockingDeque<DelayedMessage> waitList = waitMap.remove(e.getKey());
 
                     if (log.isDebugEnabled())
                         log.debug("Processing messages from discovery startup delay list:
" + waitList);
@@ -610,7 +609,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                             log.debug("Adding message to waiting list [senderId=" + nodeId
+
                                 ", msg=" + msg + ']');
 
-                        ConcurrentLinkedDeque8<DelayedMessage> list =
+                        LinkedBlockingDeque<DelayedMessage> list =
                             F.addIfAbsent(waitMap, nodeId, F.<DelayedMessage>newDeque());
 
                         assert list != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
index ab45708..024ba00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentLocalStore.java
@@ -17,15 +17,6 @@
 
 package org.apache.ignite.internal.managers.deployment;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.compute.ComputeTaskName;
@@ -46,7 +37,17 @@ import org.apache.ignite.spi.deployment.DeploymentResource;
 import org.apache.ignite.spi.deployment.DeploymentSpi;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOYED;
 import static org.apache.ignite.events.EventType.EVT_CLASS_DEPLOY_FAILED;
@@ -60,7 +61,7 @@ import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED;
  */
 class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
     /** Deployment cache by class name. */
-    private final ConcurrentMap<String, ConcurrentLinkedDeque8<GridDeployment>>
cache =
+    private final ConcurrentMap<String, LinkedBlockingDeque<GridDeployment>>
cache =
         new ConcurrentHashMap8<>();
 
     /** Mutex. */
@@ -110,7 +111,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
         Collection<GridDeployment> deps = new ArrayList<>();
 
         synchronized (mux) {
-            for (ConcurrentLinkedDeque8<GridDeployment> depList : cache.values())
+            for (LinkedBlockingDeque<GridDeployment> depList : cache.values())
                 for (GridDeployment d : depList)
                     if (!deps.contains(d))
                         deps.add(d);
@@ -122,7 +123,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
     /** {@inheritDoc} */
     @Nullable @Override public GridDeployment getDeployment(IgniteUuid ldrId) {
         synchronized (mux) {
-            for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values())
+            for (LinkedBlockingDeque<GridDeployment> deps : cache.values())
                 for (GridDeployment dep : deps)
                     if (dep.classLoaderId().equals(ldrId))
                         return dep;
@@ -232,7 +233,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
      * @return Deployment.
      */
     @Nullable private GridDeployment deployment(String alias) {
-        ConcurrentLinkedDeque8<GridDeployment> deps = cache.get(alias);
+        LinkedBlockingDeque<GridDeployment> deps = cache.get(alias);
 
         if (deps != null) {
             GridDeployment dep = deps.peekFirst();
@@ -260,10 +261,10 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
             boolean fireEvt = false;
 
             try {
-                ConcurrentLinkedDeque8<GridDeployment> cachedDeps = null;
+                LinkedBlockingDeque<GridDeployment> cachedDeps = null;
 
                 // Find existing class loader info.
-                for (ConcurrentLinkedDeque8<GridDeployment> deps : cache.values())
{
+                for (LinkedBlockingDeque<GridDeployment> deps : cache.values()) {
                     for (GridDeployment d : deps) {
                         if (d.classLoader() == ldr) {
                             // Cache class and alias.
@@ -304,7 +305,7 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
                 assert fireEvt : "Class was not added to newly created deployment [cls="
+ cls +
                     ", depMode=" + depMode + ", dep=" + dep + ']';
 
-                ConcurrentLinkedDeque8<GridDeployment> deps =
+                LinkedBlockingDeque<GridDeployment> deps =
                     F.addIfAbsent(cache, alias, F.<GridDeployment>newDeque());
 
                 if (!deps.isEmpty()) {
@@ -512,8 +513,8 @@ class GridDeploymentLocalStore extends GridDeploymentStoreAdapter {
         Collection<GridDeployment> doomed = new HashSet<>();
 
         synchronized (mux) {
-            for (Iterator<ConcurrentLinkedDeque8<GridDeployment>> i1 = cache.values().iterator();
i1.hasNext();) {
-                ConcurrentLinkedDeque8<GridDeployment> deps = i1.next();
+            for (Iterator<LinkedBlockingDeque<GridDeployment>> i1 = cache.values().iterator();
i1.hasNext();) {
+                LinkedBlockingDeque<GridDeployment> deps = i1.next();
 
                 for (Iterator<GridDeployment> i2 = deps.iterator(); i2.hasNext();)
{
                     GridDeployment dep = i2.next();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index afba4bc..b3fcd73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,21 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -64,7 +51,20 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_NESTED_LISTENER_CALLS;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -116,7 +116,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
     private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
 
     /** Finish futures. */
-    private final ConcurrentLinkedDeque8<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
+    private final LinkedBlockingDeque<FinishLockFuture> finishFuts = new LinkedBlockingDeque<>();
 
     /** Nested listener calls. */
     private final ThreadLocal<Integer> nestedLsnrCalls = new ThreadLocal<Integer>()
{
@@ -233,7 +233,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
         else if (log.isDebugEnabled())
             log.debug("Failed to find transaction for changed owner: " + owner);
 
-        if (!finishFuts.isEmptyx()) {
+        if (!finishFuts.isEmpty()) {
             for (FinishLockFuture f : finishFuts)
                 f.recheck(entry);
         }
@@ -964,7 +964,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
         X.println(">>>   lockedSize: " + locked.size());
         X.println(">>>   futsSize: " + (mvccFuts.size() + futs.size()));
         X.println(">>>   near2dhtSize: " + near2dht.size());
-        X.println(">>>   finishFutsSize: " + finishFuts.sizex());
+        X.println(">>>   finishFutsSize: " + finishFuts.size());
     }
 
     /**
@@ -984,7 +984,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
     public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion
topVer) {
         Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>();
 
-        if (!finishFuts.isEmptyx()) {
+        if (!finishFuts.isEmpty()) {
             for (FinishLockFuture fut : finishFuts) {
                 if (fut.topologyVersion().equals(topVer))
                     cands.putAll(fut.pendingLocks());
@@ -1096,7 +1096,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
         if (exchLog.isDebugEnabled())
             exchLog.debug("Rechecking pending locks for completion.");
 
-        if (!finishFuts.isEmptyx()) {
+        if (!finishFuts.isEmpty()) {
             for (FinishLockFuture fut : finishFuts)
                 fut.recheck();
         }
@@ -1133,7 +1133,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
                     Collection<GridCacheMvccCandidate> locs = entry.localCandidates();
 
                     if (!F.isEmpty(locs)) {
-                        Collection<GridCacheMvccCandidate> cands = new ConcurrentLinkedQueue<>();
+                        Collection<GridCacheMvccCandidate> cands = new LinkedBlockingQueue<>();
 
                         cands.addAll(F.view(locs, versionFilter()));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 1cdd303..04a8a07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -17,32 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.cache.Cache;
-import javax.cache.CacheException;
-import javax.cache.configuration.Factory;
-import javax.cache.expiry.Duration;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -55,7 +29,6 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.store.CacheStoreSessionListener;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
@@ -103,6 +76,33 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CacheWriterException;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
@@ -797,7 +797,7 @@ public class GridCacheUtils {
      */
     public static <T> IgniteReducer<T, Collection<T>> objectsReducer()
{
         return new IgniteReducer<T, Collection<T>>() {
-            private final Collection<T> ret = new ConcurrentLinkedQueue<>();
+            private final Collection<T> ret = new LinkedBlockingQueue<>();
 
             @Override public boolean collect(T item) {
                 if (item != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e908c05..eec5271 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -17,24 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.io.Externalizable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -107,7 +89,26 @@ import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
+import java.io.Externalizable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
@@ -3175,7 +3176,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         private AtomicBoolean guard = new AtomicBoolean(false);
 
         /** Response versions. */
-        private ConcurrentLinkedDeque8<GridCacheVersion> respVers = new ConcurrentLinkedDeque8<>();
+        private LinkedBlockingDeque<GridCacheVersion> respVers = new LinkedBlockingDeque<>();
 
         /** Node ID. */
         private final UUID nodeId;
@@ -3242,7 +3243,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                 respVers.add(ver);
 
-                if  (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE &&
guard.compareAndSet(false, true))
+                if  (respVers.size() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE &&
guard.compareAndSet(false, true))
                     snd = true;
             }
             finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 0678657..e10b4ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -17,29 +17,6 @@
 
 package org.apache.ignite.internal.util.lang;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
-import java.util.RandomAccess;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.cache.Cache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJobResult;
@@ -77,6 +54,31 @@ import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 import org.jsr166.ThreadLocalRandom8;
 
+import javax.cache.Cache;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.RandomAccess;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * Contains factory and utility methods for {@code closures}, {@code predicates}, and {@code
tuples}.
  * It also contains functional style collection comprehensions.
@@ -2332,8 +2334,8 @@ public class GridFunc {
      *      time its {@link org.apache.ignite.lang.IgniteOutClosure#apply()} method is called.
      */
     @SuppressWarnings("unchecked")
-    public static <T> IgniteCallable<ConcurrentLinkedDeque8<T>> newDeque()
{
-        return (IgniteCallable<ConcurrentLinkedDeque8<T>>)DEQUE_FACTORY;
+    public static <T> IgniteCallable<LinkedBlockingDeque<T>> newDeque()
{
+        return (IgniteCallable<LinkedBlockingDeque<T>>)DEQUE_FACTORY;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 42c7ac7..4cc06a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -17,6 +17,31 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+import sun.nio.ch.DirectBuffer;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
@@ -43,31 +68,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.configuration.ConnectorConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.lang.IgniteBiInClosure;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
-import sun.nio.ch.DirectBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.ACK_CLOSURE;
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MSG_WRITER;
@@ -1113,7 +1114,7 @@ public class GridNioServer<T> {
          */
         private void writeSslSystem(GridSelectorNioSessionImpl ses, WritableByteChannel sockCh)
             throws IOException {
-            ConcurrentLinkedQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
+            LinkedBlockingQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
 
             assert queue != null;
 
@@ -1248,7 +1249,7 @@ public class GridNioServer<T> {
      */
     private abstract class AbstractNioClientWorker extends GridWorker {
         /** Queue of change requests on this selector. */
-        private final ConcurrentLinkedQueue<NioOperationFuture> changeReqs = new ConcurrentLinkedQueue<>();
+        private final LinkedBlockingQueue<NioOperationFuture> changeReqs = new LinkedBlockingQueue<>();
 
         /** Selector to select read events. */
         private Selector selector;
@@ -2255,7 +2256,7 @@ public class GridNioServer<T> {
         /** {@inheritDoc} */
         @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException
{
             if (directMode && sslFilter != null)
-                ses.addMeta(BUF_SSL_SYSTEM_META_KEY, new ConcurrentLinkedQueue<>());
+                ses.addMeta(BUF_SSL_SYSTEM_META_KEY, new LinkedBlockingQueue<>());
 
             proceedSessionOpened(ses);
         }
@@ -2276,7 +2277,7 @@ public class GridNioServer<T> {
                 boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
 
                 if (sslSys) {
-                    ConcurrentLinkedQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
+                    LinkedBlockingQueue<ByteBuffer> queue = ses.meta(BUF_SSL_SYSTEM_META_KEY);
 
                     assert queue != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 1241f99..0b68d5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -17,18 +17,19 @@
 
 package org.apache.ignite.internal.util.nio;
 
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.util.Collection;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 /**
  * Session implementation bound to selector API and socket API.
@@ -37,7 +38,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
  */
 class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Pending write requests. */
-    private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>();
+    private final LinkedBlockingDeque<GridNioFuture<?>> queue = new LinkedBlockingDeque<>();
 
     /** Selection key associated with this session. */
     @GridToStringExclude

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee363b9f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index b283b82..0611f46 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -17,37 +17,6 @@
 
 package org.apache.ignite.spi.communication.tcp;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.ConnectException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.AbstractInterruptibleChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
@@ -126,9 +95,41 @@ import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 import org.jsr166.LongAdder8;
 
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.AbstractInterruptibleChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 
@@ -803,7 +804,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     private ShmemAcceptWorker shmemAcceptWorker;
 
     /** Shared memory workers. */
-    private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
+    private final Collection<ShmemWorker> shmemWorkers = new LinkedBlockingDeque<>();
 
     /** Clients. */
     private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();


Mime
View raw message