ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [26/50] [abbrv] ignite git commit: IGNITE-264 - Fixing tests WIP.
Date Thu, 03 Sep 2015 01:03:35 GMT
IGNITE-264 - Fixing tests WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9511aff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9511aff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9511aff

Branch: refs/heads/ignite-264
Commit: f9511aff95fd6fecff3da3bc70143d3e74e4aaaf
Parents: a733984
Author: Alexey Goncharuk <agoncharuk@gridgain.com>
Authored: Thu Aug 13 18:28:00 2015 -0700
Committer: Alexey Goncharuk <agoncharuk@gridgain.com>
Committed: Thu Aug 13 18:28:00 2015 -0700

----------------------------------------------------------------------
 .../near/GridNearOptimisticTxPrepareFuture.java |  33 ++--
 .../GridNearPessimisticTxPrepareFuture.java     |   8 +-
 .../near/GridNearTxFinishFuture.java            |  98 +++++------
 .../cache/distributed/near/GridNearTxLocal.java |  43 +----
 .../dht/GridCacheTxNodeFailureSelfTest.java     | 165 ++++++++++++++++---
 5 files changed, 230 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 95e1847..28069b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -118,8 +118,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
      */
     void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping>
mappings, Throwable e) {
         if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class))
{
-            if (tx.onePhaseCommit())
+            if (tx.onePhaseCommit()) {
                 tx.markForBackupCheck();
+
+                onComplete();
+
+                return;
+            }
         }
 
         if (err.compareAndSet(null, e)) {
@@ -189,17 +194,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
 
         this.err.compareAndSet(null, err);
 
-        if (err == null)
-            tx.state(PREPARED);
-
-        if (super.onDone(tx, err)) {
-            // Don't forget to clean up.
-            cctx.mvcc().removeFuture(this);
-
-            return true;
-        }
-
-        return false;
+        return onComplete();
     }
 
     /**
@@ -213,10 +208,20 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
     /**
      * Completeness callback.
      */
-    private void onComplete() {
-        if (super.onDone(tx, err.get()))
+    private boolean onComplete() {
+        Throwable err0 = err.get();
+
+        if (err0 == null || tx.needCheckBackup())
+            tx.state(PREPARED);
+
+        if (super.onDone(tx, err0)) {
             // Don't forget to clean up.
             cctx.mvcc().removeFuture(this);
+
+            return true;
+        }
+
+        return false;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 6de46f4..6ac1033 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -242,7 +242,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         err = this.err.get();
 
-        if (err == null)
+        if (err == null || tx.needCheckBackup())
             tx.state(PREPARED);
 
         if (super.onDone(tx, err)) {
@@ -320,9 +320,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
          * @param e Error.
          */
         void onNodeLeft(ClusterTopologyCheckedException e) {
-            if (tx.onePhaseCommit())
+            if (tx.onePhaseCommit()) {
                 tx.markForBackupCheck();
 
+                // Do not fail future for one-phase transaction right away.
+                onDone(tx);
+            }
+
             onError(e);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 1e16982..95f5149 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -227,29 +228,46 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(IgniteInternalTx tx, Throwable err) {
+    @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) {
         if ((initialized() || err != null)) {
-            if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) {
+            if (tx.needCheckBackup()) {
+                assert tx.onePhaseCommit();
+
+                if (err != null)
+                    err = new TransactionRollbackException("Failed to commit transaction.",
err);
+
+                try {
+                    tx.finish(err == null);
+                }
+                catch (IgniteCheckedException e) {
+                    if (err != null)
+                        err.addSuppressed(e);
+                    else
+                        err = e;
+                }
+            }
+
+            if (tx.onePhaseCommit()) {
                 finishOnePhase();
 
-                this.tx.tmFinish(err == null);
+                tx.tmFinish(err == null);
             }
 
             Throwable th = this.err.get();
 
-            if (super.onDone(tx, th != null ? th : err)) {
+            if (super.onDone(tx0, th != null ? th : err)) {
                 if (error() instanceof IgniteTxHeuristicCheckedException) {
-                    AffinityTopologyVersion topVer = this.tx.topologyVersion();
+                    AffinityTopologyVersion topVer = tx.topologyVersion();
 
-                    for (IgniteTxEntry e : this.tx.writeMap().values()) {
+                    for (IgniteTxEntry e : tx.writeMap().values()) {
                         GridCacheContext cacheCtx = e.context();
 
                         try {
                             if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(),
topVer)) {
-                                GridCacheEntryEx Entry = cacheCtx.cache().peekEx(e.key());
+                                GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key());
 
-                                if (Entry != null)
-                                    Entry.invalidate(null, this.tx.xidVersion());
+                                if (entry != null)
+                                    entry.invalidate(null, tx.xidVersion());
                             }
                         }
                         catch (Throwable t) {
@@ -297,13 +315,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
      * Initializes future.
      */
     void finish() {
-        if (tx.onePhaseCommit()) {
-            if (commit) {
-                if (tx.needCheckBackup())
-                    checkBackup();
-                else if (needFinishOnePhase()) {
+        if (tx.needCheckBackup()) {
+            assert tx.onePhaseCommit();
+
+            checkBackup();
+
+            // If checkBackup is set, it means that primary node has crashed and we will
not need to send
+            // finish request to it, so we can mark future as initialized.
+            markInitialized();
+        }
+
+        try {
+            if (tx.finish(commit) || (!commit && tx.state() == UNKNOWN)) {
+                if ((tx.onePhaseCommit() && needFinishOnePhase()) || (!tx.onePhaseCommit()
&& mappings != null))
                     finish(mappings.values());
 
+                markInitialized();
+
+                if (!isSync()) {
                     boolean complete = true;
 
                     for (IgniteInternalFuture<?> f : pending())
@@ -315,40 +344,16 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                         onComplete();
                 }
             }
-
-            markInitialized();
-
-            return;
+            else
+                onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx)));
         }
+        catch (Error | RuntimeException e) {
+            onError(e);
 
-        if (mappings != null) {
-            finish(mappings.values());
-
-            markInitialized();
-
-            if (!isSync()) {
-                boolean complete = true;
-
-                for (IgniteInternalFuture<?> f : pending())
-                    // Mini-future in non-sync mode gets done when message gets sent.
-                    if (isMini(f) && !f.isDone())
-                        complete = false;
-
-                if (complete)
-                    onComplete();
-            }
+            throw e;
         }
-        else {
-            assert !commit;
-
-            try {
-                tx.rollback();
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to rollback empty transaction: " + tx, e);
-            }
-
-            markInitialized();
+        catch (IgniteCheckedException e) {
+            onError(e);
         }
     }
 
@@ -641,8 +646,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         void onResult(GridDhtTxFinishResponse res) {
             assert backup != null;
 
-            if (res.checkCommittedError() != null)
+            if (res.checkCommittedError() != null) {
                 onDone(res.checkCommittedError());
+            }
             else
                 onDone(tx);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c40ac5e..0421309 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -712,7 +712,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
         cctx.mvcc().addFuture(fut);
 
-        IgniteInternalFuture<?> prepareFut = prepFut.get();
+        final IgniteInternalFuture<?> prepareFut = prepFut.get();
 
         prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
             @Override public void apply(IgniteInternalFuture<?> f) {
@@ -720,24 +720,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
                 try {
                     // Make sure that here are no exceptions.
-                    if (!needCheckBackup()) {
-                        f.get();
-
-                        if (finish(true))
-                            fut0.finish();
-                        else
-                            fut0.onError(new IgniteCheckedException("Failed to commit transaction:
" +
-                                CU.txString(GridNearTxLocal.this)));
-                    }
-                    else {
-                        assert onePhaseCommit();
+                    prepareFut.get();
 
-                        fut0.finish();
-                    }
+                    fut0.finish();
                 }
                 catch (Error | RuntimeException e) {
                     commitErr.compareAndSet(null, e);
 
+                    fut0.onError(e);
+
                     throw e;
                 }
                 catch (IgniteCheckedException e) {
@@ -779,15 +770,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                     log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']');
             }
 
-            try {
-                if (finish(false) || state() == UNKNOWN)
-                    fut.finish();
-                else
-                    fut.onError(new IgniteCheckedException("Failed to gracefully rollback
transaction: " + CU.txString(this)));
-            }
-            catch (IgniteCheckedException e) {
-                fut.onError(e);
-            }
+            fut.finish();
         }
         else {
             prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -803,19 +786,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
                     GridNearTxFinishFuture fut0 = rollbackFut.get();
 
-                    try {
-                        if (finish(false) || state() == UNKNOWN)
-                            fut0.finish();
-                        else
-                            fut0.onError(new IgniteCheckedException("Failed to gracefully
rollback transaction: " +
-                                CU.txString(GridNearTxLocal.this)));
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to gracefully rollback transaction: " +
-                            CU.txString(GridNearTxLocal.this), e);
-
-                        fut0.onError(e);
-                    }
+                    fut0.finish();
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9511aff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
index 773ec25..bca3b6f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -23,6 +23,10 @@ import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.communication.tcp.*;
@@ -30,6 +34,7 @@ import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.apache.ignite.transactions.*;
 
+import javax.cache.*;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -68,34 +73,94 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testPrimaryNodeFailureBackupCommitPessimistic() throws Exception {
-        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false);
+        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPrimaryNodeFailureBackupCommitOptimistic() throws Exception {
-        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false);
+        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPrimaryNodeFailureBackupCommitPessimisticOnBackup() throws Exception
{
-        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true);
+        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPrimaryNodeFailureBackupCommitOptimisticOnBackup() throws Exception {
-        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true);
+        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void checkPrimaryNodeFailureBackupCommit(final TransactionConcurrency conc, boolean
backup) throws Exception {
+    public void testPrimaryNodeFailureBackupRollbackPessimistic() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupRollbackOptimistic() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupRollbackPessimisticOnBackup() throws Exception
{
+        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupRollbackOptimisticOnBackup() throws Exception
{
+        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupCommitImplicit() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(null, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupCommitImplicitOnBackup() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(null, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupRollbackImplicit() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(null, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupRollbackImplicitOnBackup() throws Exception {
+        checkPrimaryNodeFailureBackupCommit(null, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkPrimaryNodeFailureBackupCommit(
+        final TransactionConcurrency conc,
+        boolean backup,
+        final boolean commit
+    ) throws Exception {
         startGrids(gridCount());
         awaitPartitionMapExchange();
 
@@ -111,25 +176,79 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest
{
 
             final CountDownLatch commitLatch = new CountDownLatch(1);
 
-            if (!backup) {
-                communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
-                communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+            if (!commit) {
+                communication(1).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareRequest.class));
+            }
+            else {
+                if (!backup) {
+                    communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+                    communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+                }
+                else
+                    communication(0).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
             }
-            else
-                communication(0).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
 
             IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>()
{
                 @Override public Object call() throws Exception {
-                    try (Transaction tx = ignite.transactions().txStart(conc, REPEATABLE_READ))
{
-                        cache.put(key, key);
+                    if (conc != null) {
+                        try (Transaction tx = ignite.transactions().txStart(conc, REPEATABLE_READ))
{
+                            cache.put(key, key);
+
+                            Transaction asyncTx = (Transaction)tx.withAsync();
+
+                            asyncTx.commit();
 
-                        Transaction asyncTx = (Transaction)tx.withAsync();
+                            commitLatch.countDown();
 
-                        asyncTx.commit();
+                            try {
+                                IgniteFuture<Object> fut = asyncTx.future();
+
+                                fut.get();
+
+                                if (!commit) {
+                                    error("Transaction has been committed");
+
+                                    fail("Transaction has been committed: " + tx);
+                                }
+                            }
+                            catch (TransactionRollbackException e) {
+                                if (commit) {
+                                    error(e.getMessage(), e);
+
+                                    fail("Failed to commit: " + e);
+                                }
+                                else
+                                    assertTrue(X.hasCause(e, TransactionRollbackException.class));
+                            }
+                        }
+                    }
+                    else {
+                        IgniteCache<Object, Object> cache0 = cache.withAsync();
+
+                        cache0.put(key, key);
+
+                        Thread.sleep(1000);
 
                         commitLatch.countDown();
 
-                        asyncTx.future().get();
+                        try {
+                            cache0.future().get();
+
+                            if (!commit) {
+                                error("Transaction has been committed");
+
+                                fail("Transaction has been committed.");
+                            }
+                        }
+                        catch (CacheException e) {
+                            if (commit) {
+                                error(e.getMessage(), e);
+
+                                fail("Failed to commit: " + e);
+                            }
+                            else
+                                assertTrue(X.hasCause(e, TransactionRollbackException.class));
+                        }
                     }
 
                     return null;
@@ -140,8 +259,11 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest
{
 
             stopGrid(1);
 
-            // No exception should happen since transaction is committed on the backup node.
+            // Check that thread successfully finished.
             fut.get();
+
+            // Check there are no hanging transactions.
+            assertEquals(0, ((IgniteEx)ignite).context().cache().context().tm().idMapSize());
         }
         finally {
             stopAllGrids();
@@ -194,9 +316,14 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest
{
         }
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException
{
-            if (!bannedClasses.contains(msg.getClass()))
-                super.sendMessage(node, msg);
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackClosure) throws IgniteSpiException {
+            GridIoMessage ioMsg = (GridIoMessage)msg;
+
+            if (!bannedClasses.contains(ioMsg.message().getClass())) {
+                super.sendMessage(node, msg, ackClosure);
+
+                U.debug(">>> Sending message: " + msg);
+            }
         }
     }
 }


Mime
View raw message