ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [10/19] ignite git commit: IGNITE-5155
Date Sat, 10 Jun 2017 06:20:41 GMT
IGNITE-5155


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f150144e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f150144e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f150144e

Branch: refs/heads/ignite-5155
Commit: f150144e15b74cfdbe8d4ac5e1fe8a30b08b6735
Parents: 0f9f317
Author: Igor Seliverstov <gvvinblade@gmail.com>
Authored: Thu Jun 8 19:21:30 2017 +0300
Committer: Igor Seliverstov <gvvinblade@gmail.com>
Committed: Fri Jun 9 10:56:41 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteDiagnosticAware.java  |   2 +-
 .../internal/IgniteDiagnosticMessage.java       | 283 +++++++++++--------
 .../processors/GridProcessorAdapter.java        |   4 -
 .../processors/cache/GridCacheIoManager.java    |   4 +-
 .../GridCachePartitionExchangeManager.java      |  71 ++---
 .../cache/GridCacheSharedManagerAdapter.java    |   3 -
 .../distributed/dht/GridDhtTxPrepareFuture.java |   4 +-
 .../dht/GridPartitionedSingleGetFuture.java     |   4 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   4 +-
 .../GridDhtPartitionsExchangeFuture.java        |   9 +-
 .../processors/cluster/ClusterProcessor.java    |  20 +-
 11 files changed, 220 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f150144e/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
index a37d924..c5e3ab8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
@@ -21,5 +21,5 @@ package org.apache.ignite.internal;
  *
  */
 public interface IgniteDiagnosticAware {
-    public void prepareRequest(IgniteDiagnosticMessage.PreparedRequest context);
+    public void prepareRequest(IgniteDiagnosticMessage.PreparedRequest req);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f150144e/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
index f6a158b..06572a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
@@ -27,7 +27,6 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -49,6 +48,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -247,61 +247,7 @@ public class IgniteDiagnosticMessage implements Message {
     /**
      *
      */
-    public static class BaseClosure implements IgniteClosure<GridKernalContext, String>
{
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        protected final UUID nodeId;
-
-        /**
-         * @param nodeId Local node context.
-         */
-        public BaseClosure(UUID nodeId) {
-            this.nodeId = nodeId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public final String apply(GridKernalContext ctx) {
-            try {
-                IgniteInternalFuture<String> commInfo = dumpCommunicationInfo(ctx,
nodeId);
-
-                StringBuilder sb = new StringBuilder();
-
-                dumpNodeBasicInfo(sb, ctx);
-
-                sb.append(U.nl());
-
-                dumpExchangeInfo(sb, ctx);
-
-                sb.append(U.nl());
-
-                sb.append(commInfo.get(10_000));
-
-                moreInfo(sb, ctx);
-
-                return sb.toString();
-            }
-            catch (Exception e) {
-                ctx.cluster().diagnosticLog().error("Failed to execute diagnostic message
closure: " + e, e);
-
-                return "Failed to execute diagnostic message closure: " + e;
-            }
-        }
-
-        /**
-         * @param sb String builder.
-         * @param ctx Context.
-         */
-        protected void moreInfo(StringBuilder sb, GridKernalContext ctx) {
-            // No-op.
-        }
-    }
-
-    /**
-     *
-     */
-    public final static class TxEntriesInfoClosure extends BaseClosure {
+    public final static class TxEntriesInfoClosure implements IgniteBiInClosure<StringBuilder,
GridKernalContext> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -309,22 +255,25 @@ public class IgniteDiagnosticMessage implements Message {
         private final int cacheId;
 
         /** */
-        private final Collection<KeyCacheObject> keys;
+        private final Set<KeyCacheObject> keys;
 
         /**
-         * @param nodeId Local node ID.
          * @param cacheId Cache ID.
          * @param keys Keys.
          */
-        public TxEntriesInfoClosure(UUID nodeId, int cacheId, Collection<KeyCacheObject>
keys) {
-            super(nodeId);
-
+        TxEntriesInfoClosure(int cacheId, Collection<KeyCacheObject> keys) {
             this.cacheId = cacheId;
-            this.keys = keys;
+            this.keys = new HashSet<>(keys);
+        }
+
+        public void merge(TxEntriesInfoClosure other) {
+            assert other != null && cacheId == other.cacheId : other;
+
+            this.keys.addAll(other.keys);
         }
 
         /** {@inheritDoc} */
-        @Override protected void moreInfo(StringBuilder sb, GridKernalContext ctx) {
+        @Override public void apply(StringBuilder sb, GridKernalContext ctx) {
             sb.append(U.nl());
 
             GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
@@ -355,6 +304,7 @@ public class IgniteDiagnosticMessage implements Message {
             }
         }
 
+        /** {@inheritDoc} */
         @Override public boolean equals(Object o) {
             if (this == o)
                 return true;
@@ -364,6 +314,7 @@ public class IgniteDiagnosticMessage implements Message {
             return cacheId == closure.cacheId;
         }
 
+        /** {@inheritDoc} */
         @Override public int hashCode() {
             return Objects.hash(cacheId);
         }
@@ -372,7 +323,7 @@ public class IgniteDiagnosticMessage implements Message {
     /**
      *
      */
-    public final static class ExchangeInfoClosure extends BaseClosure {
+    public final static class ExchangeInfoClosure implements IgniteBiInClosure<StringBuilder,
GridKernalContext> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -380,17 +331,14 @@ public class IgniteDiagnosticMessage implements Message {
         private final AffinityTopologyVersion topVer;
 
         /**
-         * @param nodeId Local node ID.
          * @param topVer Exchange version.
          */
-        public ExchangeInfoClosure(UUID nodeId, AffinityTopologyVersion topVer) {
-            super(nodeId);
-
+        ExchangeInfoClosure(AffinityTopologyVersion topVer) {
             this.topVer = topVer;
         }
 
         /** {@inheritDoc} */
-        @Override protected void moreInfo(StringBuilder sb, GridKernalContext ctx) {
+        @Override public void apply(StringBuilder sb, GridKernalContext ctx) {
             sb.append(U.nl());
 
             List<GridDhtPartitionsExchangeFuture> futs = ctx.cache().context().exchange().exchangeFutures();
@@ -406,6 +354,7 @@ public class IgniteDiagnosticMessage implements Message {
             sb.append("Failed to find exchange future: ").append(topVer);
         }
 
+        /** {@inheritDoc} */
         @Override public boolean equals(Object o) {
             if (this == o)
                 return true;
@@ -415,6 +364,7 @@ public class IgniteDiagnosticMessage implements Message {
             return Objects.equals(topVer, closure.topVer);
         }
 
+        /** {@inheritDoc} */
         @Override public int hashCode() {
             return Objects.hash(topVer);
         }
@@ -423,7 +373,7 @@ public class IgniteDiagnosticMessage implements Message {
     /**
      *
      */
-    public final static class TxInfoClosure extends BaseClosure {
+    public final static class TxInfoClosure implements IgniteBiInClosure<StringBuilder,
GridKernalContext> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -434,21 +384,16 @@ public class IgniteDiagnosticMessage implements Message {
         private final GridCacheVersion nearVer;
 
         /**
-         * @param nodeId Local node ID.
          * @param dhtVer Tx dht version.
          * @param nearVer Tx near version.
          */
-        public TxInfoClosure(UUID nodeId,
-            GridCacheVersion dhtVer,
-            GridCacheVersion nearVer) {
-            super(nodeId);
-
+        TxInfoClosure(GridCacheVersion dhtVer, GridCacheVersion nearVer) {
             this.dhtVer = dhtVer;
             this.nearVer = nearVer;
         }
 
         /** {@inheritDoc} */
-        @Override protected void moreInfo(StringBuilder sb, GridKernalContext ctx) {
+        @Override public void apply(StringBuilder sb, GridKernalContext ctx) {
             sb.append(U.nl())
                 .append("Related transactions [dhtVer=").append(dhtVer)
                 .append(", nearVer=").append(nearVer).append("]: ");
@@ -472,6 +417,7 @@ public class IgniteDiagnosticMessage implements Message {
                 sb.append(U.nl()).append("Failed to find related transactions.");
         }
 
+        /** {@inheritDoc} */
         @Override public boolean equals(Object o) {
             if (this == o)
                 return true;
@@ -482,6 +428,7 @@ public class IgniteDiagnosticMessage implements Message {
                 Objects.equals(nearVer, closure.nearVer);
         }
 
+        /** {@inheritDoc} */
         @Override public int hashCode() {
             return Objects.hash(dhtVer, nearVer);
         }
@@ -490,20 +437,52 @@ public class IgniteDiagnosticMessage implements Message {
     /**
      *
      */
-    public final static class CompoundInfoClosure extends BaseClosure {
+    private final static class CompoundInfoClosure implements IgniteClosure<GridKernalContext,
String> {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** */
-        private Set<BaseClosure> cls = new HashSet<>();
+        protected final UUID nodeId;
+
+        /** */
+        private Set<IgniteBiInClosure<StringBuilder, GridKernalContext>> cls
= new HashSet<>();
 
+        /** */
         private transient Map<Integer, StringBuilder> msgs = new TreeMap<>();
 
         /**
          * @param nodeId Local node ID.
          */
-        public CompoundInfoClosure(UUID nodeId) {
-            super(nodeId);
+        CompoundInfoClosure(UUID nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public final String apply(GridKernalContext ctx) {
+            try {
+                IgniteInternalFuture<String> commInfo = dumpCommunicationInfo(ctx,
nodeId);
+
+                StringBuilder sb = new StringBuilder();
+
+                dumpNodeBasicInfo(sb, ctx);
+
+                sb.append(U.nl());
+
+                dumpExchangeInfo(sb, ctx);
+
+                sb.append(U.nl());
+
+                sb.append(commInfo.get(10_000));
+
+                moreInfo(sb, ctx);
+
+                return sb.toString();
+            }
+            catch (Exception e) {
+                ctx.cluster().diagnosticLog().error("Failed to execute diagnostic message
closure: " + e, e);
+
+                return "Failed to execute diagnostic message closure: " + e;
+            }
         }
 
         /**
@@ -513,6 +492,9 @@ public class IgniteDiagnosticMessage implements Message {
             return nodeId;
         }
 
+        /**
+         * @return Initial message.
+         */
         public String message() {
             if(msgs.isEmpty())
                 return "";
@@ -525,9 +507,11 @@ public class IgniteDiagnosticMessage implements Message {
             return sb.toString();
         }
 
-        public <T extends BaseClosure> void add(String msg, T cl) {
-            assert cl == null || cl.nodeId == nodeId;
-
+        /**
+         * @param msg Message.
+         * @param cl Closure.
+         */
+        public void add(String msg, IgniteBiInClosure<StringBuilder, GridKernalContext>
cl) {
             if(msg != null && !msg.isEmpty()) {
                 addMessage(messageType(cl), msg);
             }
@@ -538,21 +522,22 @@ public class IgniteDiagnosticMessage implements Message {
             if(!cls.add(cl) && cl.getClass() == TxEntriesInfoClosure.class) {
                 // The entry with the same cache id is already exist, need to merge its keys.
                 TxEntriesInfoClosure cl0 = (TxEntriesInfoClosure)cl;
-                TxEntriesInfoClosure cur = getExisted(cl0);
-
-                Collection<KeyCacheObject> keys = new HashSet<>();
-
-                keys.addAll(cur.keys);
-                keys.addAll(cl0.keys);
-
-                cls.add(new TxEntriesInfoClosure(cur.nodeId, cur.cacheId, keys));
+                getExisted(cl0.cacheId).merge(cl0);
             }
         }
 
-        private <T extends BaseClosure> int messageType(T cl) {
+        /**
+         * @param cl Closure.
+         * @return Message type for given closure.
+         */
+        private int messageType(IgniteBiInClosure<StringBuilder, GridKernalContext>
cl) {
             return TypeComparator.INSTANCE.order(cl);
         }
 
+        /**
+         * @param msgType Message type.
+         * @param msg Message.
+         */
         private void addMessage(int msgType, String msg) {
             StringBuilder sb = msgs.get(msgType);
 
@@ -566,34 +551,31 @@ public class IgniteDiagnosticMessage implements Message {
         }
 
         /**
-         * @param cl Closure.
+         * @param cacheId Cache ID.
          * @return Existed closure.
          */
-        private TxEntriesInfoClosure getExisted(TxEntriesInfoClosure cl) {
-            Iterator<BaseClosure> it = cls.iterator();
-
-            while (it.hasNext()) {
-                BaseClosure cl0 = it.next();
-
-                if (cl0.equals(cl)) {
-                    it.remove();
-
+        private TxEntriesInfoClosure getExisted(int cacheId) {
+            for (IgniteBiInClosure<StringBuilder, GridKernalContext> cl0 : cls) {
+                if (TxEntriesInfoClosure.class == cl0.getClass() &&
+                    ((TxEntriesInfoClosure)cl0).cacheId == cacheId)
                     return (TxEntriesInfoClosure)cl0;
-                }
             }
 
             throw new AssertionError("Existed closure is not found");
         }
 
-        /** {@inheritDoc} */
-        @Override protected void moreInfo(StringBuilder sb, GridKernalContext ctx) {
-            ArrayList<BaseClosure> cls = new ArrayList<>(this.cls);
+        /**
+         * @param sb String builder.
+         * @param ctx Grid context.
+         */
+        private void moreInfo(StringBuilder sb, GridKernalContext ctx) {
+            ArrayList<IgniteBiInClosure<StringBuilder, GridKernalContext>> cls
= new ArrayList<>(this.cls);
 
             Collections.sort(cls, TypeComparator.INSTANCE);
 
             for (int i = 0; i < cls.size(); i++) {
                 try {
-                    cls.get(i).moreInfo(sb, ctx);
+                    cls.get(i).apply(sb, ctx);
                 }
                 catch (Exception e) {
                     ctx.cluster().diagnosticLog().error("Failed to populate diagnostic with
additional information: " + e, e);
@@ -604,31 +586,76 @@ public class IgniteDiagnosticMessage implements Message {
         }
     }
 
-    public static class PreparedRequest {
+    /**
+     *
+     */
+    public static final class PreparedRequest {
+        /** */
         final UUID localNodeId;
 
+        /** */
         final Map<UUID, CompoundInfoClosure> cls = new HashMap<>();
 
-        public PreparedRequest(UUID id) {
-            localNodeId = id;
+        /**
+         * @param nodeId Local node ID.
+         */
+        public PreparedRequest(UUID nodeId) {
+            localNodeId = nodeId;
         }
 
-        public void exchangeInfo(UUID nodeId, AffinityTopologyVersion topVer, String msg)
{
-            closure(nodeId).add(msg, new ExchangeInfoClosure(localNodeId, topVer));
+        /**
+         * @param nodeId Remote node ID.
+         * @param topVer Topology version.
+         * @param msg Initial message.
+         * @return Prepared request.
+         */
+        public PreparedRequest exchangeInfo(UUID nodeId, AffinityTopologyVersion topVer,
String msg) {
+            closure(nodeId).add(msg, new ExchangeInfoClosure(topVer));
+
+            return this;
         }
 
-        public void txKeyInfo(UUID nodeId, int cacheId, Collection<KeyCacheObject>
keys, String msg) {
-            closure(nodeId).add(msg, new TxEntriesInfoClosure(localNodeId, cacheId, keys));
+        /**
+         * @param nodeId Remote node ID.
+         * @param cacheId Cache ID.
+         * @param keys Entry keys.
+         * @param msg Initial message.
+         * @return Prepared request.
+         */
+        public PreparedRequest txKeyInfo(UUID nodeId, int cacheId, Collection<KeyCacheObject>
keys, String msg) {
+            closure(nodeId).add(msg, new TxEntriesInfoClosure(cacheId, keys));
+
+            return this;
         }
 
-        public void remoteTxInfo(UUID nodeId, GridCacheVersion dhtVer, GridCacheVersion nearVer,
String msg) {
-            closure(nodeId).add(msg, new TxInfoClosure(localNodeId, dhtVer, nearVer));
+        /**
+         * @param nodeId Remote node ID.
+         * @param dhtVer Tx dht version.
+         * @param nearVer Tx near version.
+         * @param msg Initial message.
+         * @return Prepared request.
+         */
+        public PreparedRequest remoteTxInfo(UUID nodeId, GridCacheVersion dhtVer, GridCacheVersion
nearVer, String msg) {
+            closure(nodeId).add(msg, new TxInfoClosure(dhtVer, nearVer));
+
+            return this;
         }
 
-        public void basicInfo(final UUID nodeId, final String msg) {
+        /**
+         * @param nodeId Remote node ID.
+         * @param msg Initial message.
+         * @return Prepared request.
+         */
+        public PreparedRequest basicInfo(final UUID nodeId, final String msg) {
             closure(nodeId).add(msg, null);
+
+            return this;
         }
 
+        /**
+         * @param ctx Grid context.
+         * @return Future.
+         */
         public IgniteInternalFuture<String> send(GridKernalContext ctx) {
             IgniteReducer<String, String> reducer = new IgniteReducer<String, String>()
{
                 private final StringBuilder sb = new StringBuilder();
@@ -663,6 +690,10 @@ public class IgniteDiagnosticMessage implements Message {
             return fut;
         }
 
+        /**
+         * @param nodeId Remote node ID.
+         * @return Compound closure
+         */
         private CompoundInfoClosure closure(UUID nodeId) {
             CompoundInfoClosure cl = cls.get(nodeId);
 
@@ -671,18 +702,16 @@ public class IgniteDiagnosticMessage implements Message {
 
             return cl;
         }
-
-
     }
 
     /**
      *
      */
-    private static final class TypeComparator implements Comparator<BaseClosure> {
+    private static final class TypeComparator implements Comparator<IgniteBiInClosure<StringBuilder,
GridKernalContext>> {
         public static final TypeComparator INSTANCE = new TypeComparator();
 
         /** {@inheritDoc} */
-        @Override public int compare(BaseClosure c1, BaseClosure c2) {
+        @Override public int compare(IgniteBiInClosure<StringBuilder, GridKernalContext>
c1, IgniteBiInClosure<StringBuilder, GridKernalContext> c2) {
             return Integer.compare(order(c1), order(c2));
         }
 
@@ -690,7 +719,7 @@ public class IgniteDiagnosticMessage implements Message {
          * @param cl Closure.
          * @return Type order.
          */
-        private int order(BaseClosure cl) {
+        private int order(IgniteBiInClosure<StringBuilder, GridKernalContext> cl) {
             if (cl == null)
                 return 0;
 
@@ -743,6 +772,14 @@ public class IgniteDiagnosticMessage implements Message {
     }
 
     /**
+     * @param nodeId Local node ID.
+     * @return New prepared request.
+     */
+    public static PreparedRequest newRequest(UUID nodeId) {
+        return new PreparedRequest(nodeId);
+    }
+
+    /**
      * @param time Time.
      * @return Time string.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f150144e/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index 75b43c1..b9d7260 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -37,10 +37,6 @@ import org.jetbrains.annotations.Nullable;
  */
 public abstract class GridProcessorAdapter implements GridProcessor {
     /** */
-    protected final boolean DIAGNOSTIC_ENABLED =
-        IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED,
false);
-
-    /** */
     private static final String DIAGNOSTIC_LOG_CATEGORY = "org.apache.ignite.internal.diagnostic";
 
     /** Kernal context. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f150144e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index e66b09e..9a980d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -144,12 +144,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter
{
             if (pendingMsgs.isEmpty())
                 return;
 
-            log.info("Pending cache messages waiting for exchange [" +
+            diagnosticLog.info("Pending cache messages waiting for exchange [" +
                 "readyVer=" + cctx.exchange().readyAffinityVersion() +
                 ", discoVer=" + cctx.discovery().topologyVersion() + ']');
 
             for (GridCacheMessage msg : pendingMsgs)
-                log.info("Message [waitVer=" + msg.topologyVersion() + ", msg=" + msg + ']');
+                diagnosticLog.info("Message [waitVer=" + msg.topologyVersion() + ", msg="
+ msg + ']');
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f150144e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a9307a0..e762f46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -119,6 +119,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.preloa
  * Partition exchange manager.
  */
 public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedManagerAdapter<K,
V> {
+    /** */
+    private final boolean DIAGNOSTIC_ENABLED = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED,
false);
     /** Exchange history size. */
     private static final int EXCHANGE_HISTORY_SIZE = 1000;
 
@@ -1370,9 +1372,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @throws Exception If failed.
      */
     public void dumpDebugInfo(@Nullable AffinityTopologyVersion exchTopVer) throws Exception
{
-        if(!DIAGNOSTIC_ENABLED)
-            return;
-
         U.warn(diagnosticLog, "Ready affinity version: " + readyTopVer.get());
 
         U.warn(diagnosticLog, "Last exchange future: " + lastInitializedFut);
@@ -1386,7 +1385,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 U.warn(diagnosticLog, ">>> " + fut);
         }
 
-        IgniteDiagnosticMessage.PreparedRequest req = new IgniteDiagnosticMessage.PreparedRequest(cctx.localNodeId());
+        IgniteDiagnosticMessage.PreparedRequest req =
+            DIAGNOSTIC_ENABLED ? new IgniteDiagnosticMessage.PreparedRequest(cctx.localNodeId())
: null;
 
         ExchangeFutureSet exchFuts = this.exchFuts;
 
@@ -1398,7 +1398,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
                 U.warn(diagnosticLog, ">>> " + fut.shortInfo());
 
-                fut.prepareRequest(req);
+
+                if (req != null)
+                    fut.prepareRequest(req);
 
                 if (++cnt == 10)
                     break;
@@ -1407,11 +1409,19 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
 
         dumpPendingObjects(exchTopVer, req);
 
-        try {
-            U.warn(diagnosticLog, req.send(cctx.kernalContext()).get(2 * cctx.gridConfig().getNetworkTimeout()));
-        }
-        catch (IgniteCheckedException e) {
-            U.error(diagnosticLog, "Failed to dump remote diagnostic info: " + e, e);
+        if (req != null) {
+            IgniteInternalFuture<String> fut = req.send(cctx.kernalContext());
+
+            fut.listen(new CI1<IgniteInternalFuture<String>>() {
+                @Override public void apply(IgniteInternalFuture<String> future) {
+                    try {
+                        U.warn(diagnosticLog, future.get());
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(diagnosticLog, "Failed to dump remote diagnostic info: "
+ e, e);
+                    }
+                }
+            });
         }
 
         for (CacheGroupContext grp : cctx.cache().cacheGroups())
@@ -1421,9 +1431,11 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
 
         cctx.io().dumpPendingMessages();
 
-        // Dump IO manager statistics.
         if (IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_IO_DUMP_ON_TIMEOUT,
false))
             cctx.gridIO().dumpStats();
+
+        if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
+            U.dumpThreads(diagnosticLog);
     }
 
     /**
@@ -1490,22 +1502,16 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
             if (dumpLongRunningOperations0(timeout)) {
                 nextLongRunningOpsDumpTime = U.currentTimeMillis() + nextDumpTimeout(longRunningOpsDumpStep++,
timeout);
 
-                if (DIAGNOSTIC_ENABLED) {
-                    if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT,
false)) {
-                        U.warn(diagnosticLog, "Found long running cache operations, dump
threads.");
-
-                        U.dumpThreads(diagnosticLog);
-                    }
-
-
-
-                    // Dump IO manager statistics.
-                    if (IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_IO_DUMP_ON_TIMEOUT,
false)) {
-                        U.warn(diagnosticLog, "Found long running cache operations, dump
IO statistics.");
-
-                        cctx.gridIO().dumpStats();
+                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        try {
+                            dumpDebugInfo();
+                        }
+                        catch (Exception e) {
+                            U.error(diagnosticLog, "Failed to dump debug information: " +
e, e);
+                        }
                     }
-                }
+                });
             }
             else {
                 nextLongRunningOpsDumpTime = 0;
@@ -1537,10 +1543,10 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
 
     /**
      * @param exchTopVer Exchange topology version.
-     * @param req
+     * @param req Prepared diagnostic request.
      */
     private void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer,
-        IgniteDiagnosticMessage.PreparedRequest req) {
+        @Nullable IgniteDiagnosticMessage.PreparedRequest req) {
         IgniteTxManager tm = cctx.tm();
 
         if (tm != null) {
@@ -1607,13 +1613,13 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
     /**
      * Logs the future and dumps diagnostic info if needed.
      * @param fut Future.
-     * @param req
+     * @param req Prepared diagnostic request.
      */
     private void dumpDiagnosticInfo(IgniteInternalFuture<?> fut,
-        IgniteDiagnosticMessage.PreparedRequest req) {
+        @Nullable IgniteDiagnosticMessage.PreparedRequest req) {
         U.warn(diagnosticLog, ">>> " + fut);
 
-        if (fut instanceof IgniteDiagnosticAware)
+        if (req != null && fut instanceof IgniteDiagnosticAware)
             ((IgniteDiagnosticAware)fut).prepareRequest(req);
     }
 
@@ -1843,9 +1849,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                             U.error(log, "Failed to dump debug information:
" + e, e);
                                         }
 
-                                        if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT,
false))
-                                            U.dumpThreads(log);
-
                                         nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++,
futTimeout);
                                     }
                                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f150144e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
index 14fe57f..8ae8679 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
@@ -28,9 +28,6 @@ import org.apache.ignite.lang.IgniteFuture;
  * Convenience adapter for cache managers.
  */
 public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManager<K,
V> {
-    /** */
-    protected final boolean DIAGNOSTIC_ENABLED =
-        IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED,
false);
 
     /** */
     private static final String DIAGNOSTIC_LOG_CATEGORY = "org.apache.ignite.internal.diagnostic";

http://git-wip-us.apache.org/repos/asf/ignite/blob/f150144e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 5f77716..c4dc60f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1569,7 +1569,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareRequest(IgniteDiagnosticMessage.PreparedRequest context)
{
+    @Override public void prepareRequest(IgniteDiagnosticMessage.PreparedRequest req) {
         if (!isDone()) {
             for (IgniteInternalFuture fut : futures()) {
                 if (!fut.isDone() && fut instanceof MiniFuture) {
@@ -1579,7 +1579,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                         GridCacheVersion dhtVer = tx.xidVersion();
                         GridCacheVersion nearVer = tx.nearXidVersion();
 
-                        context.remoteTxInfo(f.nodeId, dhtVer, nearVer, "GridDhtTxPrepareFuture
" +
+                        req.remoteTxInfo(f.nodeId, dhtVer, nearVer, "GridDhtTxPrepareFuture
" +
                             "waiting for response [node=" + f.nodeId +
                             ", topVer=" + tx.topologyVersion() +
                             ", dhtVer=" + dhtVer +

http://git-wip-us.apache.org/repos/asf/ignite/blob/f150144e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 17ca659..cc77ce0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -769,7 +769,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareRequest(IgniteDiagnosticMessage.PreparedRequest context)
{
+    @Override public void prepareRequest(IgniteDiagnosticMessage.PreparedRequest req) {
         if (!isDone()) {
             UUID nodeId;
             AffinityTopologyVersion topVer;
@@ -780,7 +780,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
             }
 
             if (nodeId != null)
-                context.basicInfo(nodeId, "GridPartitionedSingleGetFuture waiting for " +
+                req.basicInfo(nodeId, "GridPartitionedSingleGetFuture waiting for " +
                     "response [node=" + nodeId +
                     ", key=" + key +
                     ", futId=" + futId +

http://git-wip-us.apache.org/repos/asf/ignite/blob/f150144e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index a55888f..a6e63e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -598,7 +598,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareRequest(IgniteDiagnosticMessage.PreparedRequest context)
{
+    @Override public void prepareRequest(IgniteDiagnosticMessage.PreparedRequest req) {
         if (!isDone()) {
             for (IgniteInternalFuture fut : futures()) {
                 if (!fut.isDone() && isMini(fut)) {
@@ -616,7 +616,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
                     }
 
                     if (rmtNodeId != null) {
-                        context.txKeyInfo(rmtNodeId, cctx.cacheId(), m.keys,
+                        req.txKeyInfo(rmtNodeId, cctx.cacheId(), m.keys,
                             "GridDhtColocatedLockFuture waiting for response [node=" + rmtNodeId
+
                             ", cache=" + cctx.name() +
                             ", miniId=" + m.futId +

http://git-wip-us.apache.org/repos/asf/ignite/blob/f150144e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ddd4dc3..f8f5020 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1030,9 +1030,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         catch (Exception e) {
             U.error(log, "Failed to dump debug information: " + e, e);
         }
-
-        if (getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
-            U.dumpThreads(log);
     }
 
     /**
@@ -2159,7 +2156,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /** {@inheritDoc} */
-    @Override public void prepareRequest(IgniteDiagnosticMessage.PreparedRequest context)
{
+    @Override public void prepareRequest(IgniteDiagnosticMessage.PreparedRequest req) {
         if (!isDone()) {
             ClusterNode crd;
             Set<UUID> remaining;
@@ -2171,13 +2168,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             if (crd != null) {
                 if (!crd.isLocal()) {
-                    context.exchangeInfo(crd.id(), topologyVersion(), "Exchange future waiting
for coordinator " +
+                    req.exchangeInfo(crd.id(), topologyVersion(), "Exchange future waiting
for coordinator " +
                         "response [crd=" + crd.id() + ", topVer=" + topologyVersion() + ']');
                 }
                 else if (!remaining.isEmpty()){
                     UUID nodeId = remaining.iterator().next();
 
-                    context.exchangeInfo(crd.id(), topologyVersion(), "Exchange future waiting
for server " +
+                    req.exchangeInfo(crd.id(), topologyVersion(), "Exchange future waiting
for server " +
                         "response [node=" + nodeId + ", topVer=" + topologyVersion() + ']');
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f150144e/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 0991ac8..b89f00d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -30,11 +30,11 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteDiagnosticMessage;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
@@ -44,9 +44,6 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridTimerTask;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -65,6 +62,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CLUSTER_PROC;
+import static org.apache.ignite.internal.GridTopic.TOPIC_INTERNAL_DIAGNOSTIC;
 import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
 
 /**
@@ -135,7 +133,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
             },
             EVT_NODE_FAILED, EVT_NODE_LEFT);
 
-        ctx.io().addMessageListener(GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener()
{
+        ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener()
{
             @Override public void onMessage(UUID nodeId, Object msg) {
                 if (msg instanceof IgniteDiagnosticMessage) {
                     IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg;
@@ -170,7 +168,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
                         IgniteDiagnosticMessage res = IgniteDiagnosticMessage.createResponse(resMsg,
msg0.futureId());
 
                         try {
-                            ctx.io().sendToGridTopic(node, GridTopic.TOPIC_INTERNAL_DIAGNOSTIC,
res, GridIoPolicy.SYSTEM_POOL);
+                            ctx.io().sendToGridTopic(node, TOPIC_INTERNAL_DIAGNOSTIC, res,
GridIoPolicy.SYSTEM_POOL);
                         }
                         catch (ClusterTopologyCheckedException e) {
                             if (diagnosticLog.isDebugEnabled()) {
@@ -311,6 +309,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
         if (verChecker != null)
             verChecker.stop();
 
+        ctx.io().removeMessageListener(TOPIC_INTERNAL_DIAGNOSTIC);
     }
 
     /**
@@ -340,10 +339,13 @@ public class ClusterProcessor extends GridProcessorAdapter {
      */
     public void dumpBasicInfo(final UUID nodeId, final String msg,
         @Nullable IgniteInClosure<IgniteInternalFuture<String>> lsnr) {
-        if (!DIAGNOSTIC_ENABLED)
+        if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED,
false))
             return;
 
-        IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new IgniteDiagnosticMessage.BaseClosure(ctx.localNodeId()),
msg);
+        IgniteInternalFuture<String> fut = IgniteDiagnosticMessage
+            .newRequest(ctx.localNodeId())
+            .basicInfo(nodeId, msg)
+            .send(ctx);
 
         if (lsnr != null)
             fut.listen(lsnr);
@@ -435,7 +437,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
 
             diagnosticFuturesMap().put(msg.futureId(), fut);
 
-            ctx.io().sendToGridTopic(nodeId, GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, msg, GridIoPolicy.SYSTEM_POOL);
+            ctx.io().sendToGridTopic(nodeId, TOPIC_INTERNAL_DIAGNOSTIC, msg, GridIoPolicy.SYSTEM_POOL);
 
             return fut;
         }


Mime
View raw message