ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/6] ignite git commit: IgniteDiagnosticMessage: fixed TxEntriesInfoClosure, added tests.
Date Thu, 22 Jun 2017 12:15:48 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5566 060feac70 -> 6c50feba9


IgniteDiagnosticMessage: fixed TxEntriesInfoClosure, added tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e9457e7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e9457e7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e9457e7

Branch: refs/heads/ignite-5566
Commit: 7e9457e71200bfe6218f7805328d45a42918da92
Parents: b16f725
Author: Igor Seliverstov <iseliverstov@gridgain.com>
Authored: Wed Jun 21 17:02:29 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Jun 21 17:02:29 2017 +0300

----------------------------------------------------------------------
 .../internal/IgniteDiagnosticMessage.java       |  19 +-
 .../managers/IgniteDiagnosticMessagesTest.java  | 288 +++++++++++++++++++
 .../ignite/testframework/GridStringLogger.java  |  35 ++-
 3 files changed, 333 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7e9457e7/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
index 4f37f53..8715e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashSet;
@@ -265,7 +267,7 @@ public class IgniteDiagnosticMessage implements Message {
         private final int cacheId;
 
         /** */
-        private final Set<KeyCacheObject> keys;
+        private Collection<KeyCacheObject> keys;
 
         /**
          * @param cacheId Cache ID.
@@ -321,6 +323,21 @@ public class IgniteDiagnosticMessage implements Message {
 
             this.keys.addAll(other0.keys);
         }
+
+        /**
+         * @param out Output stream.
+         * @throws IOException If failed.
+         */
+        private void writeObject(java.io.ObjectOutputStream out)
+            throws IOException {
+            /*
+            Transform to List, otherwise Set unmarshalling fails since need
+            call KeyCacheObject.finishUnmarshal before adding in Set.
+             */
+            this.keys = new ArrayList<>(keys);
+
+            out.defaultWriteObject();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e9457e7/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
index 08dbc66..1d1b519 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
@@ -17,20 +17,29 @@
 
 package org.apache.ignite.internal.managers;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -40,12 +49,15 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridStringLogger;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
  *
@@ -178,6 +190,282 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If failed.
      */
+    public void testSeveralLongRunningTxs() throws Exception {
+        int timeout = 3500;
+
+        System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, String.valueOf(timeout));
+
+        try {
+            testSpi = true;
+
+            startGrid(0);
+
+            GridStringLogger strLog = this.strLog = new GridStringLogger();
+
+            strLog.logLength(1024 * 100);
+
+            startGrid(1);
+
+            awaitPartitionMapExchange();
+
+            CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
+            ccfg.setCacheMode(PARTITIONED);
+            ccfg.setAtomicityMode(TRANSACTIONAL);
+
+            final Ignite node0 = ignite(0);
+            final Ignite node1 = ignite(1);
+
+            node0.createCache(ccfg);
+
+            UUID id0 = node0.cluster().localNode().id();
+
+            TestRecordingCommunicationSpi.spi(node0).blockMessages(GridNearLockResponse.class,
node1.name());
+
+            IgniteCache<Object, Object> cache = node0.cache(DEFAULT_CACHE_NAME);
+
+            int txCnt = 4;
+
+            final List<Integer> keys = primaryKeys(cache, txCnt, 0);
+
+            final AtomicInteger idx = new AtomicInteger();
+
+            IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+                        IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME);
+
+                        Integer key = keys.get(idx.getAndIncrement() % keys.size());
+
+                        cache.putIfAbsent(key, String.valueOf(key));
+
+                        tx.commit();
+                    }
+
+                    return null;
+                }
+            }, txCnt * 2, "tx");
+
+            U.sleep(timeout * 2);
+
+            assertFalse(fut.isDone());
+
+            TestRecordingCommunicationSpi.spi(node0).stopBlock();
+
+            fut.get();
+
+            String log = strLog.toString();
+
+            assertTrue(log.contains("Cache entries [cacheId=" + CU.cacheId(DEFAULT_CACHE_NAME)
+
+                ", cacheName=" + DEFAULT_CACHE_NAME + "]:"));
+            assertTrue(countTxKeysInASingleBlock(log) == txCnt);
+
+            assertTrue(log.contains("General node info [id=" + id0));
+        }
+        finally {
+            System.clearProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT);
+        }
+    }
+
+    /**
+     * @param log Log.
+     * @return Count of keys in the first Cache entries block.
+     */
+    private int countTxKeysInASingleBlock(String log) {
+        int idx1 = log.indexOf("Cache entries");
+        int idx2 = log.indexOf("Local communication statistics");
+
+        assert idx1 != -1 && idx2 != -1;
+
+        // The first cache entries info block.
+        String txInfo = log.substring(idx1, idx2);
+
+        String srch = "    Key [";  // Search string.
+        int len = 9;                // Search string length.
+
+        int idx0, cnt = 0;
+
+        while ((idx0 = txInfo.indexOf(srch) + len) >= len) {
+            txInfo = txInfo.substring(idx0);
+
+            cnt++;
+        }
+
+        return cnt;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLongRunningTx() throws Exception {
+        System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, "3500");
+
+        try {
+            startGrid(0);
+
+            GridStringLogger strLog = this.strLog = new GridStringLogger();
+
+            startGrid(1);
+
+            awaitPartitionMapExchange();
+
+            CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
+            ccfg.setCacheMode(PARTITIONED);
+            ccfg.setAtomicityMode(TRANSACTIONAL);
+
+            final Ignite node0 = ignite(0);
+            final Ignite node1 = ignite(1);
+
+            node0.createCache(ccfg);
+
+            UUID id0 = node0.cluster().localNode().id();
+
+            final CountDownLatch l1 = new CountDownLatch(1);
+            final CountDownLatch l2 = new CountDownLatch(1);
+
+            final AtomicReference<Integer> key = new AtomicReference<>();
+
+            GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>();
+
+            fut.add(GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try (Transaction tx = node0.transactions().txStart()) {
+                        IgniteCache<Object, Object> cache = node0.cache(DEFAULT_CACHE_NAME);
+
+                        key.set(primaryKey(cache));
+
+                        cache.putIfAbsent(key.get(), "dummy val");
+
+                        l1.countDown();
+                        l2.await();
+
+                        tx.commit();
+                    }
+
+                    return null;
+                }
+            }, "tx-1"));
+
+            fut.add(GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try (Transaction tx = node1.transactions().txStart()) {
+                        l1.await();
+
+                        IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME);
+
+                        cache.replace(key.get(), "dummy val2");
+
+                        tx.commit();
+                    }
+
+                    return null;
+                }
+            }, "tx-2"));
+
+            fut.markInitialized();
+
+            U.sleep(10_000);
+
+            assertFalse(fut.isDone());
+
+            l2.countDown();
+
+            fut.get();
+
+            String log = strLog.toString();
+
+            assertTrue(log.contains("Cache entries [cacheId=" + CU.cacheId(DEFAULT_CACHE_NAME)
+ ", cacheName=" + DEFAULT_CACHE_NAME + "]:"));
+            assertTrue(log.contains("General node info [id=" + id0));
+        }
+        finally {
+            System.clearProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoteTx() throws Exception {
+        int timeout = 3500;
+
+        System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, String.valueOf(timeout));
+
+        try {
+            testSpi = true;
+
+            startGrid(0);
+
+            GridStringLogger strLog = this.strLog = new GridStringLogger();
+
+            strLog.logLength(1024 * 100);
+
+            startGrid(1);
+
+            awaitPartitionMapExchange();
+
+            CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
+            ccfg.setCacheMode(PARTITIONED);
+            ccfg.setAtomicityMode(TRANSACTIONAL);
+            ccfg.setBackups(1);
+            ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+            final Ignite node0 = ignite(0);
+            final Ignite node1 = ignite(1);
+
+            node0.createCache(ccfg);
+
+            UUID id0 = node0.cluster().localNode().id();
+
+            TestRecordingCommunicationSpi.spi(node0).blockMessages(GridDhtTxPrepareResponse.class,
node1.name());
+
+            int txCnt = 4;
+
+            final List<Integer> keys = primaryKeys(node1.cache(DEFAULT_CACHE_NAME),
txCnt, 0);
+
+            final AtomicInteger idx = new AtomicInteger();
+
+            IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try (Transaction tx = node1.transactions().txStart()) {
+                        IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME);
+
+                        Integer key = keys.get(idx.getAndIncrement());
+
+                        cache.getAndPut(key, "new-" + key);
+
+                        tx.commit();
+                    }
+
+                    return null;
+                }
+            }, txCnt, "tx");
+
+            U.sleep(timeout * 2);
+
+            assertFalse(fut.isDone());
+
+            TestRecordingCommunicationSpi.spi(node0).stopBlock();
+
+            fut.get();
+
+            String log = strLog.toString();
+
+            assertTrue(log.contains("Related transactions ["));
+            assertTrue(log.contains("General node info [id=" + id0));
+        }
+        finally {
+            System.clearProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     private void checkBasicDiagnosticInfo() throws Exception {
         startGrids(3);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e9457e7/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java
index 2a25542..ba85bf0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java
@@ -38,6 +38,9 @@ public class GridStringLogger implements IgniteLogger {
     private final boolean dbg;
 
     /** */
+    private volatile int chars = CHAR_CNT;
+
+    /** */
     private final IgniteLogger echo;
 
     /**
@@ -64,22 +67,38 @@ public class GridStringLogger implements IgniteLogger {
     }
 
     /**
+     * @param chars History buffer length.
+     */
+    public void logLength(int chars) {
+        this.chars = chars;
+    }
+
+    /**
+     * @return History buffer length.
+     */
+    private int logLength() {
+        return chars;
+    }
+
+    /**
      * @param msg Message to log.
      */
-    private void log(String msg) {
+    private synchronized void log(String msg) {
         buf.append(msg).append(U.nl());
 
         if (echo != null)
             echo.info("[GridStringLogger echo] " + msg);
 
-        if (buf.length() > CHAR_CNT) {
+        int logLength = logLength();
+
+        if (buf.length() > logLength) {
             if (echo != null)
                 echo.warning("Cleaning GridStringLogger history.");
 
-            buf.delete(0, buf.length() - CHAR_CNT);
+            buf.delete(0, buf.length() - logLength);
         }
 
-        assert buf.length() <= CHAR_CNT;
+        assert buf.length() <= logLength;
     }
 
     /** {@inheritDoc} */
@@ -108,7 +127,7 @@ public class GridStringLogger implements IgniteLogger {
     }
 
     /** {@inheritDoc} */
-    @Override public void warning(String msg, @Nullable Throwable e) {
+    @Override public synchronized void warning(String msg, @Nullable Throwable e) {
         log(msg);
 
         if (e != null)
@@ -121,7 +140,7 @@ public class GridStringLogger implements IgniteLogger {
     }
 
     /** {@inheritDoc} */
-    @Override public void error(String msg, @Nullable Throwable e) {
+    @Override public synchronized void error(String msg, @Nullable Throwable e) {
         log(msg);
 
         if (e != null)
@@ -156,12 +175,12 @@ public class GridStringLogger implements IgniteLogger {
     /**
      * Resets logger.
      */
-    public void reset() {
+    public synchronized void reset() {
         buf.setLength(0);
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
+    @Override public synchronized String toString() {
         return buf.toString();
     }
 }
\ No newline at end of file


Mime
View raw message