ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [05/19] ignite git commit: IGNITE-5155
Date Sat, 10 Jun 2017 06:20:36 GMT
IGNITE-5155


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16b6d1b0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16b6d1b0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16b6d1b0

Branch: refs/heads/ignite-5155
Commit: 16b6d1b00246450139402a83a603b8a4af9d18c7
Parents: a0cdab2
Author: Igor Seliverstov <gvvinblade@gmail.com>
Authored: Thu Jun 8 16:00:28 2017 +0300
Committer: Igor Seliverstov <gvvinblade@gmail.com>
Committed: Thu Jun 8 16:00:28 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteDiagnosticAware.java  |   2 +-
 .../internal/IgniteDiagnosticMessage.java       | 191 +++++++++++++++++--
 .../GridCachePartitionExchangeManager.java      |   4 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   2 +-
 .../dht/GridPartitionedSingleGetFuture.java     |   2 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   2 +-
 .../GridDhtPartitionsExchangeFuture.java        |   2 +-
 .../processors/cluster/ClusterProcessor.java    |   8 +-
 8 files changed, 186 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/16b6d1b0/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
index 44f531e..6f724af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
@@ -21,5 +21,5 @@ package org.apache.ignite.internal;
  *
  */
 public interface IgniteDiagnosticAware {
-    public void dumpDiagnosticInfo();
+    public void prepareRequest();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/16b6d1b0/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
index 9561b65..180d394 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
@@ -20,9 +20,16 @@ package org.apache.ignite.internal;
 import java.nio.ByteBuffer;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
@@ -242,10 +249,10 @@ public class IgniteDiagnosticMessage implements Message {
         protected final UUID nodeId;
 
         /**
-         * @param ctx Local node context.
+         * @param nodeId Local node context.
          */
-        public BaseClosure(GridKernalContext ctx) {
-            this.nodeId = ctx.localNodeId();
+        public BaseClosure(UUID nodeId) {
+            this.nodeId = nodeId;
         }
 
         /** {@inheritDoc} */
@@ -288,7 +295,7 @@ public class IgniteDiagnosticMessage implements Message {
     /**
      *
      */
-    public static class TxEntriesInfoClosure extends BaseClosure {
+    public final static class TxEntriesInfoClosure extends BaseClosure {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -299,12 +306,12 @@ public class IgniteDiagnosticMessage implements Message {
         private final Collection<KeyCacheObject> keys;
 
         /**
-         * @param ctx Context.
+         * @param nodeId Local node ID.
          * @param cacheId Cache ID.
          * @param keys Keys.
          */
-        public TxEntriesInfoClosure(GridKernalContext ctx, int cacheId, Collection<KeyCacheObject>
keys) {
-            super(ctx);
+        public TxEntriesInfoClosure(UUID nodeId, int cacheId, Collection<KeyCacheObject>
keys) {
+            super(nodeId);
 
             this.cacheId = cacheId;
             this.keys = keys;
@@ -341,12 +348,25 @@ public class IgniteDiagnosticMessage implements Message {
                 sb.append(U.nl()).append("    Key [key=").append(key).append(", entry=").append(e).append("]");
             }
         }
+
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            TxEntriesInfoClosure closure = (TxEntriesInfoClosure)o;
+            return cacheId == closure.cacheId;
+        }
+
+        @Override public int hashCode() {
+            return Objects.hash(cacheId);
+        }
     }
 
     /**
      *
      */
-    public static class ExchangeInfoClosure extends BaseClosure {
+    public final static class ExchangeInfoClosure extends BaseClosure {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -354,11 +374,11 @@ public class IgniteDiagnosticMessage implements Message {
         private final AffinityTopologyVersion topVer;
 
         /**
-         * @param ctx Context.
+         * @param nodeId Local node ID.
          * @param topVer Exchange version.
          */
-        public ExchangeInfoClosure(GridKernalContext ctx, AffinityTopologyVersion topVer)
{
-            super(ctx);
+        public ExchangeInfoClosure(UUID nodeId, AffinityTopologyVersion topVer) {
+            super(nodeId);
 
             this.topVer = topVer;
         }
@@ -379,12 +399,25 @@ public class IgniteDiagnosticMessage implements Message {
 
             sb.append("Failed to find exchange future: ").append(topVer);
         }
+
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            ExchangeInfoClosure closure = (ExchangeInfoClosure)o;
+            return Objects.equals(topVer, closure.topVer);
+        }
+
+        @Override public int hashCode() {
+            return Objects.hash(topVer);
+        }
     }
 
     /**
      *
      */
-    public static class TxInfoClosure extends BaseClosure {
+    public final static class TxInfoClosure extends BaseClosure {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -395,14 +428,14 @@ public class IgniteDiagnosticMessage implements Message {
         private final GridCacheVersion nearVer;
 
         /**
-         * @param ctx Context.
+         * @param nodeId Local node ID.
          * @param dhtVer Tx dht version.
          * @param nearVer Tx near version.
          */
-        public TxInfoClosure(GridKernalContext ctx,
+        public TxInfoClosure(UUID nodeId,
             GridCacheVersion dhtVer,
             GridCacheVersion nearVer) {
-            super(ctx);
+            super(nodeId);
 
             this.dhtVer = dhtVer;
             this.nearVer = nearVer;
@@ -432,6 +465,132 @@ public class IgniteDiagnosticMessage implements Message {
             if (!found)
                 sb.append(U.nl()).append("Failed to find related transactions.");
         }
+
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            TxInfoClosure closure = (TxInfoClosure)o;
+            return Objects.equals(dhtVer, closure.dhtVer) &&
+                Objects.equals(nearVer, closure.nearVer);
+        }
+
+        @Override public int hashCode() {
+            return Objects.hash(dhtVer, nearVer);
+        }
+    }
+
+    /**
+     *
+     */
+    public final static class CompoundInfoClosure extends BaseClosure {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private Set<BaseClosure> cls = new HashSet<>();
+
+        /**
+         * @param nodeId Local node ID.
+         */
+        public CompoundInfoClosure(UUID nodeId) {
+            super(nodeId);
+        }
+
+        /**
+         * @return Node ID.
+         */
+        public UUID nodeId() {
+            return nodeId;
+        }
+
+        public <T extends BaseClosure> void add(T cl) {
+            assert cl != null && cl.nodeId == nodeId;
+
+            if(!cls.add(cl) && cl.getClass() == TxEntriesInfoClosure.class) {
+                // The entry with the same cache id is already exist, need to merge its keys.
+                TxEntriesInfoClosure cl0 = (TxEntriesInfoClosure)cl;
+                TxEntriesInfoClosure cur = getExisted(cl0);
+
+                Collection<KeyCacheObject> keys = new HashSet<>();
+
+                keys.addAll(cur.keys);
+                keys.addAll(cl0.keys);
+
+                cls.add(new TxEntriesInfoClosure(cur.nodeId, cur.cacheId, keys));
+            }
+        }
+
+        /**
+         * @param cl Closure.
+         * @return Existed closure.
+         */
+        private TxEntriesInfoClosure getExisted(TxEntriesInfoClosure cl) {
+            Iterator<BaseClosure> it = cls.iterator();
+
+            while (it.hasNext()) {
+                BaseClosure cl0 = it.next();
+
+                if (cl0.equals(cl)) {
+                    it.remove();
+
+                    return (TxEntriesInfoClosure)cl0;
+                }
+            }
+
+            throw new AssertionError("Existed closure is not found");
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void moreInfo(StringBuilder sb, GridKernalContext ctx) {
+            ArrayList<BaseClosure> cls = new ArrayList<>(this.cls);
+
+            Collections.sort(cls, TypeComparator.INSTANCE);
+
+            for (int i = 0; i < cls.size(); i++) {
+                try {
+                    cls.get(i).moreInfo(sb, ctx);
+                }
+                catch (Exception e) {
+                    ctx.cluster().diagnosticLog().error("Failed to populate diagnostic with
additional information: " + e, e);
+
+                    sb.append(U.nl()).append("Failed to populate diagnostic with additional
information: ").append(e);
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static final class TypeComparator implements Comparator<BaseClosure> {
+        public static final TypeComparator INSTANCE = new TypeComparator();
+
+        /** {@inheritDoc} */
+        @Override public int compare(BaseClosure c1, BaseClosure c2) {
+            return Integer.compare(order(c1), order(c2));
+        }
+
+        /**
+         * @param cl Closure.
+         * @return Type order.
+         */
+        private int order(BaseClosure cl) {
+            if (cl == null)
+                return 0;
+
+            if (cl.getClass() == ExchangeInfoClosure.class)
+                return 1;
+
+            if (cl.getClass() == TxInfoClosure.class)
+                return 2;
+
+            if (cl.getClass() == TxEntriesInfoClosure.class)
+                return 3;
+
+            return Integer.MAX_VALUE;
+        }
     }
 
     /**
@@ -457,7 +616,6 @@ public class IgniteDiagnosticMessage implements Message {
         sb.append("Partitions exchange info [readyVer=").append(exchMgr.readyAffinityVersion()).append(']').append(U.nl())
             .append("Last initialized exchange future: ").append(fut);
     }
-
     /**
      * @param ctx Context.
      * @param nodeId Target node ID.
@@ -469,6 +627,7 @@ public class IgniteDiagnosticMessage implements Message {
         else
             return new GridFinishedFuture<>("Unexpected communication SPI: " + ctx.config().getCommunicationSpi());
     }
+
     /**
      * @param time Time.
      * @return Time string.

http://git-wip-us.apache.org/repos/asf/ignite/blob/16b6d1b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e9587f5..0f0dcd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1395,7 +1395,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
                 U.warn(diagnosticLog, ">>> " + fut.shortInfo());
 
-                fut.dumpDiagnosticInfo();
+                fut.prepareRequest();
 
                 if (++cnt == 10)
                     break;
@@ -1600,7 +1600,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         U.warn(diagnosticLog, ">>> " + fut);
 
         if (fut instanceof IgniteDiagnosticAware)
-            ((IgniteDiagnosticAware)fut).dumpDiagnosticInfo();
+            ((IgniteDiagnosticAware)fut).prepareRequest();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/16b6d1b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 8bb6111..832b33b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1568,7 +1568,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     }
 
     /** {@inheritDoc} */
-    @Override public void dumpDiagnosticInfo() {
+    @Override public void prepareRequest() {
         if (!isDone()) {
             for (IgniteInternalFuture fut : futures()) {
                 if (!fut.isDone() && fut instanceof MiniFuture) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/16b6d1b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 9ce3842..a0db3e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -768,7 +768,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     }
 
     /** {@inheritDoc} */
-    @Override public void dumpDiagnosticInfo() {
+    @Override public void prepareRequest() {
         if (!isDone()) {
             UUID nodeId;
             AffinityTopologyVersion topVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/16b6d1b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 69bd57f..ecbf177 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -597,7 +597,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
     }
 
     /** {@inheritDoc} */
-    @Override public void dumpDiagnosticInfo() {
+    @Override public void prepareRequest() {
         if (!isDone()) {
             for (IgniteInternalFuture fut : futures()) {
                 if (!fut.isDone() && isMini(fut)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/16b6d1b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 88d31e2..8c23473 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2158,7 +2158,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /** {@inheritDoc} */
-    @Override public void dumpDiagnosticInfo() {
+    @Override public void prepareRequest() {
         if (!isDone()) {
             ClusterNode crd;
             Set<UUID> remaining;

http://git-wip-us.apache.org/repos/asf/ignite/blob/16b6d1b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index e715709..13546df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -345,7 +345,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
             return;
 
         IgniteInternalFuture<String> fut = diagnosticInfo(nodeId,
-            new IgniteDiagnosticMessage.TxInfoClosure(ctx, dhtVer, nearVer),
+            new IgniteDiagnosticMessage.TxInfoClosure(ctx.localNodeId(), dhtVer, nearVer),
             msg);
 
         listenAndLog(fut);
@@ -361,7 +361,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
         if (!DIAGNOSTIC_ENABLED)
             return;
 
-        IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new IgniteDiagnosticMessage.TxEntriesInfoClosure(ctx,
cacheId, keys), msg);
+        IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new IgniteDiagnosticMessage.TxEntriesInfoClosure(ctx.localNodeId(),
cacheId, keys), msg);
 
         listenAndLog(fut);
     }
@@ -375,7 +375,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
         if (!DIAGNOSTIC_ENABLED)
             return;
 
-        IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new IgniteDiagnosticMessage.BaseClosure(ctx),
msg);
+        IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new IgniteDiagnosticMessage.BaseClosure(ctx.localNodeId()),
msg);
 
         if (lsnr != null)
             fut.listen(lsnr);
@@ -392,7 +392,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
         if (!DIAGNOSTIC_ENABLED)
             return;
 
-        IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new IgniteDiagnosticMessage.ExchangeInfoClosure(ctx,
topVer), msg);
+        IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new IgniteDiagnosticMessage.ExchangeInfoClosure(ctx.localNodeId(),
topVer), msg);
 
         listenAndLog(fut);
     }


Mime
View raw message