ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [7/7] incubator-ignite git commit: # ignite-746-debug
Date Wed, 15 Apr 2015 07:22:33 GMT
# ignite-746-debug


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/25520e66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/25520e66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/25520e66

Branch: refs/heads/ignite-746-debug
Commit: 25520e66cae83fd59faa3ded6eb60a0820f7ab2e
Parents: 8106a79
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Apr 15 10:22:15 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Apr 15 10:22:15 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  88 +++++++++++
 .../distributed/dht/atomic/TestDebugLog.java    | 156 +++++++++++++++++++
 ...eAtomicInvalidPartitionHandlingSelfTest.java |  70 ++++++---
 3 files changed, 294 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25520e66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ac4ae2c2..1785cfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -240,6 +240,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
+        for (Object o : keys) {
+            Object key0;
+
+            if (o instanceof KeyCacheObject)
+                key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false);
+            else
+                key0 = o;
+
+            TestDebugLog.addEntryMessage(key0, null, "nodeLeft");
+        }
+
         Boolean single0 = single;
 
         if (single0 != null && single0) {
@@ -329,6 +340,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      */
     public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
         if (res.remapKeys() != null) {
+            for (Object o : res.remapKeys()) {
+                Object key0;
+
+                if (o instanceof KeyCacheObject)
+                    key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false);
+                else
+                    key0 = o;
+
+                TestDebugLog.addEntryMessage(key0, null, "remap");
+            }
+
             assert cctx.config().getAtomicWriteOrderMode() == PRIMARY;
 
             mapOnTopology(res.remapKeys(), true, nodeId, true);
@@ -341,6 +363,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         Boolean single0 = single;
 
         if (single0 != null && single0) {
+            for (Object o : keys) {
+                Object key0;
+
+                if (o instanceof KeyCacheObject)
+                    key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false);
+                else
+                    key0 = o;
+
+                TestDebugLog.addEntryMessage(key0, null, "single response");
+            }
+
             assert singleNodeId.equals(nodeId) : "Invalid response received for single-node
mapped future " +
                 "[singleNodeId=" + singleNodeId + ", nodeId=" + nodeId + ", res=" + res +
']';
 
@@ -365,6 +398,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         else {
             GridNearAtomicUpdateRequest req = mappings.get(nodeId);
 
+            for (Object o : keys) {
+                Object key0;
+
+                if (o instanceof KeyCacheObject)
+                    key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false);
+                else
+                    key0 = o;
+
+                TestDebugLog.addEntryMessage(key0, null, "response");
+            }
+
             if (req != null) { // req can be null if onResult is being processed concurrently
with onNodeLeft.
                 updateNear(req, res);
 
@@ -596,6 +640,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
             single = true;
 
+            for (Object o : keys) {
+                Object key0;
+
+                if (o instanceof KeyCacheObject)
+                    key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false);
+                else
+                    key0 = o;
+
+                TestDebugLog.addEntryMessage(key0, null, "mapSingle0");
+            }
+
             // Optimize mapping for single key.
             mapSingle(primary.id(), req);
 
@@ -729,6 +784,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
             single = true;
 
+            for (Object o : keys) {
+                Object key0;
+
+                if (o instanceof KeyCacheObject)
+                    key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false);
+                else
+                    key0 = o;
+
+                TestDebugLog.addEntryMessage(key0, null, "mapSingle1");
+            }
+
             mapSingle(entry.getKey(), entry.getValue());
 
             return;
@@ -815,12 +881,34 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     ", req=" + req + ']';
 
                 locUpdate = req;
+
+                for (Object o : req.keys()) {
+                    Object key0;
+
+                    if (o instanceof KeyCacheObject)
+                        key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false);
+                    else
+                        key0 = o;
+
+                    TestDebugLog.addEntryMessage(key0, null, "localUpdate");
+                }
             }
             else {
                 try {
                     if (log.isDebugEnabled())
                         log.debug("Sending near atomic update request [nodeId=" + req.nodeId()
+ ", req=" + req + ']');
 
+                    for (Object o : req.keys()) {
+                        Object key0;
+
+                        if (o instanceof KeyCacheObject)
+                            key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(),
false);
+                        else
+                            key0 = o;
+
+                        TestDebugLog.addEntryMessage(key0, null, "remoteUpdate");
+                    }
+
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25520e66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/TestDebugLog.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/TestDebugLog.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/TestDebugLog.java
new file mode 100644
index 0000000..46f7324
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/TestDebugLog.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * TODO
+ */
+public class TestDebugLog {
+    /** */
+    private static final List<Object> msgs = Collections.synchronizedList(new ArrayList<>(100_000));
+
+    static class Message {
+        String thread = Thread.currentThread().getName();
+
+        String msg;
+
+        public Message(String msg) {
+            this.msg = msg;
+        }
+
+        public String toString() {
+            return "Msg [msg=" + msg + ", thread=" + thread + ']';
+        }
+    }
+
+    static class EntryMessage extends Message {
+        Object key;
+        Object val;
+
+        public EntryMessage(Object key, Object val, String msg) {
+            super(msg);
+
+            this.key = key;
+            this.val = val;
+        }
+
+        public String toString() {
+            return "EntryMsg [key=" + key + ", val=" + val + ", msg=" + msg + ", thread"
+ thread + ']';
+        }
+    }
+
+    static final boolean out = false;
+
+    public static void addMessage(String msg) {
+        msgs.add(new Message(msg));
+
+        if (out)
+            System.out.println(msg);
+    }
+
+    public static void addEntryMessage(Object key, Object val, String msg) {
+        EntryMessage msg0 = new EntryMessage(key, val, msg);
+
+        msgs.add(msg0);
+
+        if (out)
+            System.out.println(msg0.toString());
+    }
+
+    public static void printMessages(boolean file) {
+        List<Object> msgs0;
+
+        synchronized (msgs) {
+            msgs0 = new ArrayList<>(msgs);
+
+            msgs.clear();
+        }
+
+        if (file) {
+            try {
+                FileOutputStream out = new FileOutputStream("test_debug.log");
+
+                PrintWriter w = new PrintWriter(out);
+
+                for (Object msg : msgs0)
+                    w.println(msg.toString());
+
+                w.close();
+
+                out.close();
+            }
+            catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        else {
+            for (Object msg : msgs0)
+                System.out.println(msg);
+        }
+    }
+
+    public static void printKeyMessages(boolean file, Object key) {
+        List<Object> msgs0;
+
+        synchronized (msgs) {
+            msgs0 = new ArrayList<>(msgs);
+
+            msgs.clear();
+        }
+
+        if (file) {
+            try {
+                FileOutputStream out = new FileOutputStream("test_debug.log");
+
+                PrintWriter w = new PrintWriter(out);
+
+                for (Object msg : msgs0) {
+                    if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key))
+                        continue;
+
+                    w.println(msg.toString());
+                }
+
+                w.close();
+
+                out.close();
+            }
+            catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        else {
+            for (Object msg : msgs0) {
+                if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key))
+                    continue;
+
+                System.out.println(msg);
+            }
+        }
+    }
+
+    public static void clear() {
+        msgs.clear();
+    }
+
+    public static void main(String[] args) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25520e66/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 1f63214..f0a7af0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.communication.tcp.*;
@@ -101,6 +102,8 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
         super.beforeTest();
 
         delay = false;
+
+        TestDebugLog.clear();
     }
 
     /** {@inheritDoc} */
@@ -125,7 +128,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
     /**
      * @throws Exception If failed.
      */
-    public void testClockFullAsync() throws Exception {
+    public void _testClockFullAsync() throws Exception {
         checkRestarts(CLOCK, FULL_ASYNC);
     }
 
@@ -146,7 +149,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
     /**
      * @throws Exception If failed.
      */
-    public void testPrimaryFullAsync() throws Exception {
+    public void _testPrimaryFullAsync() throws Exception {
         checkRestarts(PRIMARY, FULL_ASYNC);
     }
 
@@ -169,17 +172,21 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends
GridCommonA
         try {
             final IgniteCache<Object, Object> cache = grid(0).cache(null);
 
+            final IgniteCache<Object, Object> asyncCache = cache.withAsync();
+
             final int range = 100_000;
 
             final Set<Integer> keys = new LinkedHashSet<>();
 
-            for (int i = 0; i < range; i++) {
-                cache.put(i, 0);
+            try (IgniteDataStreamer<Object, Object> streamer = grid(0).dataStreamer(null))
{
+                for (int i = 0; i < range; i++) {
+                    streamer.addData(i, 0);
 
-                keys.add(i);
+                    keys.add(i);
 
-                if (i > 0 && i % 10_000 == 0)
-                    System.err.println("Put: " + i);
+                    if (i > 0 && i % 10_000 == 0)
+                        System.err.println("Put: " + i);
+                }
             }
 
             final Affinity<Integer> aff = grid(0).affinity(null);
@@ -230,38 +237,61 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends
GridCommonA
                     Random rnd = new Random();
 
                     while (!done.get()) {
-                        try {
-                            int cnt = rnd.nextInt(5);
+                        int cnt = rnd.nextInt(5);
 
-                            if (cnt < 2) {
-                                int key = rnd.nextInt(range);
+                        boolean put = cnt < 2;
+
+                        if (put) {
+                            int key = rnd.nextInt(range);
+
+                            int val = rnd.nextInt();
+
+                            TestDebugLog.addEntryMessage(key, val, "put");
+
+                            asyncCache.put(key, val);
+                        }
+                        else {
+                            Map<Integer, Integer> upd = new TreeMap<>();
 
+                            for (int i = 0; i < cnt; i++) {
+                                int key = rnd.nextInt(range);
                                 int val = rnd.nextInt();
 
-                                cache.put(key, val);
+                                upd.put(key, val);
+
+                                TestDebugLog.addEntryMessage(key, val, "putAll");
                             }
-                            else {
-                                Map<Integer, Integer> upd = new TreeMap<>();
 
-                                for (int i = 0; i < cnt; i++)
-                                    upd.put(rnd.nextInt(range), rnd.nextInt());
+                            asyncCache.putAll(upd);
+                        }
 
-                                cache.putAll(upd);
-                            }
+                        try {
+                            asyncCache.future().get(30_000);
+                        }
+                        catch (IgniteFutureTimeoutException e) {
+                            TestDebugLog.addMessage("update timeout, put: " + put);
+
+                            TestDebugLog.printMessages(false);
+
+                            System.exit(22);
                         }
                         catch (CachePartialUpdateException ignored) {
                             // No-op.
                         }
+                        catch (IgniteException e) {
+                            if (!e.hasCause(CachePartialUpdateCheckedException.class))
+                                throw e;
+                        }
                     }
 
                     return null;
                 }
-            }, 4);
+            }, 1, "update-thread");
 
             Random rnd = new Random();
 
             // Restart random nodes.
-            for (int r = 0; r < 20; r++) {
+            for (int r = 0; r < 20 && !fut.isDone(); r++) {
                 int idx0 = rnd.nextInt(gridCnt - 1) + 1;
 
                 stopGrid(idx0);


Mime
View raw message