Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 85A2117F46 for ; Wed, 15 Apr 2015 07:22:50 +0000 (UTC) Received: (qmail 44085 invoked by uid 500); 15 Apr 2015 07:22:50 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 44057 invoked by uid 500); 15 Apr 2015 07:22:50 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 44048 invoked by uid 99); 15 Apr 2015 07:22:50 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Apr 2015 07:22:50 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 15 Apr 2015 07:22:48 +0000 Received: (qmail 43758 invoked by uid 99); 15 Apr 2015 07:22:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Apr 2015 07:22:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 47957E0A1D; Wed, 15 Apr 2015 07:22:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 15 Apr 2015 07:22:33 -0000 Message-Id: <0f1a9434a47145e784fb6b1fdc3a23cc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [7/7] incubator-ignite git commit: # ignite-746-debug X-Virus-Checked: Checked by ClamAV on apache.org # ignite-746-debug Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/25520e66 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/25520e66 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/25520e66 Branch: refs/heads/ignite-746-debug Commit: 25520e66cae83fd59faa3ded6eb60a0820f7ab2e Parents: 8106a79 Author: sboikov Authored: Wed Apr 15 10:22:15 2015 +0300 Committer: sboikov Committed: Wed Apr 15 10:22:15 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 88 +++++++++++ .../distributed/dht/atomic/TestDebugLog.java | 156 +++++++++++++++++++ ...eAtomicInvalidPartitionHandlingSelfTest.java | 70 ++++++--- 3 files changed, 294 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25520e66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ac4ae2c2..1785cfd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -240,6 +240,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { + for (Object o : keys) { + Object key0; + + if (o instanceof KeyCacheObject) + key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false); + else + key0 = o; + + TestDebugLog.addEntryMessage(key0, null, "nodeLeft"); + } + Boolean single0 = single; if (single0 != null && single0) { @@ -329,6 +340,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter */ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { if (res.remapKeys() != null) { + for (Object o : res.remapKeys()) { + Object key0; + + if (o instanceof KeyCacheObject) + key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false); + else + key0 = o; + + TestDebugLog.addEntryMessage(key0, null, "remap"); + } + assert cctx.config().getAtomicWriteOrderMode() == PRIMARY; mapOnTopology(res.remapKeys(), true, nodeId, true); @@ -341,6 +363,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter Boolean single0 = single; if (single0 != null && single0) { + for (Object o : keys) { + Object key0; + + if (o instanceof KeyCacheObject) + key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false); + else + key0 = o; + + TestDebugLog.addEntryMessage(key0, null, "single response"); + } + assert singleNodeId.equals(nodeId) : "Invalid response received for single-node mapped future " + "[singleNodeId=" + singleNodeId + ", nodeId=" + nodeId + ", res=" + res + ']'; @@ -365,6 +398,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter else { GridNearAtomicUpdateRequest req = mappings.get(nodeId); + for (Object o : keys) { + Object key0; + + if (o instanceof KeyCacheObject) + key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false); + else + key0 = o; + + TestDebugLog.addEntryMessage(key0, null, "response"); + } + if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft. updateNear(req, res); @@ -596,6 +640,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter single = true; + for (Object o : keys) { + Object key0; + + if (o instanceof KeyCacheObject) + key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false); + else + key0 = o; + + TestDebugLog.addEntryMessage(key0, null, "mapSingle0"); + } + // Optimize mapping for single key. mapSingle(primary.id(), req); @@ -729,6 +784,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter single = true; + for (Object o : keys) { + Object key0; + + if (o instanceof KeyCacheObject) + key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false); + else + key0 = o; + + TestDebugLog.addEntryMessage(key0, null, "mapSingle1"); + } + mapSingle(entry.getKey(), entry.getValue()); return; @@ -815,12 +881,34 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter ", req=" + req + ']'; locUpdate = req; + + for (Object o : req.keys()) { + Object key0; + + if (o instanceof KeyCacheObject) + key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false); + else + key0 = o; + + TestDebugLog.addEntryMessage(key0, null, "localUpdate"); + } } else { try { if (log.isDebugEnabled()) log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + for (Object o : req.keys()) { + Object key0; + + if (o instanceof KeyCacheObject) + key0 = ((KeyCacheObject) o).value(cctx.cacheObjectContext(), false); + else + key0 = o; + + TestDebugLog.addEntryMessage(key0, null, "remoteUpdate"); + } + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25520e66/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/TestDebugLog.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/TestDebugLog.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/TestDebugLog.java new file mode 100644 index 0000000..46f7324 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/TestDebugLog.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.io.*; +import java.util.*; + +/** + * TODO + */ +public class TestDebugLog { + /** */ + private static final List msgs = Collections.synchronizedList(new ArrayList<>(100_000)); + + static class Message { + String thread = Thread.currentThread().getName(); + + String msg; + + public Message(String msg) { + this.msg = msg; + } + + public String toString() { + return "Msg [msg=" + msg + ", thread=" + thread + ']'; + } + } + + static class EntryMessage extends Message { + Object key; + Object val; + + public EntryMessage(Object key, Object val, String msg) { + super(msg); + + this.key = key; + this.val = val; + } + + public String toString() { + return "EntryMsg [key=" + key + ", val=" + val + ", msg=" + msg + ", thread" + thread + ']'; + } + } + + static final boolean out = false; + + public static void addMessage(String msg) { + msgs.add(new Message(msg)); + + if (out) + System.out.println(msg); + } + + public static void addEntryMessage(Object key, Object val, String msg) { + EntryMessage msg0 = new EntryMessage(key, val, msg); + + msgs.add(msg0); + + if (out) + System.out.println(msg0.toString()); + } + + public static void printMessages(boolean file) { + List msgs0; + + synchronized (msgs) { + msgs0 = new ArrayList<>(msgs); + + msgs.clear(); + } + + if (file) { + try { + FileOutputStream out = new FileOutputStream("test_debug.log"); + + PrintWriter w = new PrintWriter(out); + + for (Object msg : msgs0) + w.println(msg.toString()); + + w.close(); + + out.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + } + else { + for (Object msg : msgs0) + System.out.println(msg); + } + } + + public static void printKeyMessages(boolean file, Object key) { + List msgs0; + + synchronized (msgs) { + msgs0 = new ArrayList<>(msgs); + + msgs.clear(); + } + + if (file) { + try { + FileOutputStream out = new FileOutputStream("test_debug.log"); + + PrintWriter w = new PrintWriter(out); + + for (Object msg : msgs0) { + if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key)) + continue; + + w.println(msg.toString()); + } + + w.close(); + + out.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + } + else { + for (Object msg : msgs0) { + if (msg instanceof EntryMessage && !((EntryMessage)msg).key.equals(key)) + continue; + + System.out.println(msg); + } + } + } + + public static void clear() { + msgs.clear(); + } + + public static void main(String[] args) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25520e66/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java index 1f63214..f0a7af0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -101,6 +102,8 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA super.beforeTest(); delay = false; + + TestDebugLog.clear(); } /** {@inheritDoc} */ @@ -125,7 +128,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA /** * @throws Exception If failed. */ - public void testClockFullAsync() throws Exception { + public void _testClockFullAsync() throws Exception { checkRestarts(CLOCK, FULL_ASYNC); } @@ -146,7 +149,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA /** * @throws Exception If failed. */ - public void testPrimaryFullAsync() throws Exception { + public void _testPrimaryFullAsync() throws Exception { checkRestarts(PRIMARY, FULL_ASYNC); } @@ -169,17 +172,21 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA try { final IgniteCache cache = grid(0).cache(null); + final IgniteCache asyncCache = cache.withAsync(); + final int range = 100_000; final Set keys = new LinkedHashSet<>(); - for (int i = 0; i < range; i++) { - cache.put(i, 0); + try (IgniteDataStreamer streamer = grid(0).dataStreamer(null)) { + for (int i = 0; i < range; i++) { + streamer.addData(i, 0); - keys.add(i); + keys.add(i); - if (i > 0 && i % 10_000 == 0) - System.err.println("Put: " + i); + if (i > 0 && i % 10_000 == 0) + System.err.println("Put: " + i); + } } final Affinity aff = grid(0).affinity(null); @@ -230,38 +237,61 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA Random rnd = new Random(); while (!done.get()) { - try { - int cnt = rnd.nextInt(5); + int cnt = rnd.nextInt(5); - if (cnt < 2) { - int key = rnd.nextInt(range); + boolean put = cnt < 2; + + if (put) { + int key = rnd.nextInt(range); + + int val = rnd.nextInt(); + + TestDebugLog.addEntryMessage(key, val, "put"); + + asyncCache.put(key, val); + } + else { + Map upd = new TreeMap<>(); + for (int i = 0; i < cnt; i++) { + int key = rnd.nextInt(range); int val = rnd.nextInt(); - cache.put(key, val); + upd.put(key, val); + + TestDebugLog.addEntryMessage(key, val, "putAll"); } - else { - Map upd = new TreeMap<>(); - for (int i = 0; i < cnt; i++) - upd.put(rnd.nextInt(range), rnd.nextInt()); + asyncCache.putAll(upd); + } - cache.putAll(upd); - } + try { + asyncCache.future().get(30_000); + } + catch (IgniteFutureTimeoutException e) { + TestDebugLog.addMessage("update timeout, put: " + put); + + TestDebugLog.printMessages(false); + + System.exit(22); } catch (CachePartialUpdateException ignored) { // No-op. } + catch (IgniteException e) { + if (!e.hasCause(CachePartialUpdateCheckedException.class)) + throw e; + } } return null; } - }, 4); + }, 1, "update-thread"); Random rnd = new Random(); // Restart random nodes. - for (int r = 0; r < 20; r++) { + for (int r = 0; r < 20 && !fut.isDone(); r++) { int idx0 = rnd.nextInt(gridCnt - 1) + 1; stopGrid(idx0);