ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [25/47] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-709_2' into ignite-836_2
Date Fri, 15 May 2015 14:02:27 GMT
Merge remote-tracking branch 'remotes/origin/ignite-709_2' into ignite-836_2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8c105ec2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8c105ec2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8c105ec2

Branch: refs/heads/ignite-709_3
Commit: 8c105ec2379acb78bd5fc9185cc99b9a1bc18562
Parents: 9126679 16e211e
Author: sevdokimov <sevdokimov@gridgain.com>
Authored: Wed May 13 15:09:57 2015 +0300
Committer: sevdokimov <sevdokimov@gridgain.com>
Committed: Wed May 13 15:09:57 2015 +0300

----------------------------------------------------------------------
 assembly/release-base.xml                       |    4 +-
 bin/ignite-schema-import.bat                    |    2 +-
 bin/ignite-schema-import.sh                     |    2 +-
 bin/ignite.bat                                  |    2 +-
 bin/ignite.sh                                   |    2 +-
 bin/ignitevisorcmd.bat                          |    2 +-
 bin/ignitevisorcmd.sh                           |    2 +-
 bin/include/build-classpath.bat                 |   46 +
 bin/include/build-classpath.sh                  |   71 +
 bin/include/functions.sh                        |    2 +-
 bin/include/target-classpath.bat                |   46 -
 bin/include/target-classpath.sh                 |   71 -
 examples/pom.xml                                |    2 +-
 modules/aop/pom.xml                             |    2 +-
 modules/aws/pom.xml                             |    2 +-
 modules/clients/pom.xml                         |    2 +-
 modules/cloud/pom.xml                           |    4 +-
 .../TcpDiscoveryCloudIpFinderSelfTest.java      |    2 -
 modules/codegen/pom.xml                         |    2 +-
 .../ignite/codegen/MessageCodeGenerator.java    |    4 +-
 modules/core/pom.xml                            |    2 +-
 .../internal/direct/DirectByteBufferStream.java |    4 +-
 .../communication/GridIoMessageFactory.java     |    4 +-
 .../eventstorage/GridEventStorageManager.java   |    5 +-
 .../cache/DynamicCacheDescriptor.java           |   16 +-
 .../processors/cache/GridCacheAdapter.java      |  544 +-
 .../cache/GridCacheEvictionManager.java         |    2 +-
 .../processors/cache/GridCacheMapEntry.java     |   18 +-
 .../processors/cache/GridCacheMvccManager.java  |    4 +-
 .../GridCachePartitionExchangeManager.java      |    3 +
 .../processors/cache/GridCacheProcessor.java    |  189 +-
 .../processors/cache/GridCacheProxyImpl.java    |   24 -
 .../processors/cache/GridCacheSwapManager.java  |  215 +-
 .../processors/cache/GridCacheTtlManager.java   |   42 +-
 .../processors/cache/GridCacheUtils.java        |    5 +-
 .../processors/cache/IgniteInternalCache.java   |   27 -
 ...ridCacheOptimisticCheckPreparedTxFuture.java |  434 --
 ...idCacheOptimisticCheckPreparedTxRequest.java |  232 -
 ...dCacheOptimisticCheckPreparedTxResponse.java |  179 -
 .../distributed/GridCacheTxRecoveryFuture.java  |  506 ++
 .../distributed/GridCacheTxRecoveryRequest.java |  261 +
 .../GridCacheTxRecoveryResponse.java            |  182 +
 .../GridDistributedTxRemoteAdapter.java         |    2 +-
 .../distributed/dht/GridDhtLocalPartition.java  |    2 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   32 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   27 +
 .../cache/distributed/dht/GridDhtTxMapping.java |    2 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   81 +-
 .../dht/GridPartitionedGetFuture.java           |    2 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   27 +-
 .../colocated/GridDhtDetachedCacheEntry.java    |    4 +-
 .../distributed/near/GridNearCacheAdapter.java  |   10 -
 .../distributed/near/GridNearCacheEntry.java    |    4 +-
 .../distributed/near/GridNearLockFuture.java    |    5 -
 .../near/GridNearOptimisticTxPrepareFuture.java |  779 ++
 .../GridNearPessimisticTxPrepareFuture.java     |  349 +
 .../cache/distributed/near/GridNearTxLocal.java |   84 +-
 .../near/GridNearTxPrepareFuture.java           | 1050 ---
 .../near/GridNearTxPrepareFutureAdapter.java    |  226 +
 .../processors/cache/local/GridLocalCache.java  |    8 +-
 .../local/atomic/GridLocalAtomicCache.java      |   27 +-
 .../cache/query/GridCacheQueryManager.java      |   21 +-
 .../cache/query/GridCacheSqlQuery.java          |    2 +-
 .../cache/query/GridCacheTwoStepQuery.java      |   17 +
 .../cache/transactions/IgniteInternalTx.java    |    9 +-
 .../cache/transactions/IgniteTxAdapter.java     |    4 +-
 .../cache/transactions/IgniteTxHandler.java     |  100 +-
 .../transactions/IgniteTxLocalAdapter.java      |   16 +-
 .../cache/transactions/IgniteTxManager.java     |  185 +-
 .../datastreamer/DataStreamerImpl.java          |    2 +
 .../processors/igfs/IgfsDataManager.java        |    3 +
 .../processors/igfs/IgfsDeleteWorker.java       |    4 +
 .../processors/igfs/IgfsMetaManager.java        |    2 +-
 .../internal/processors/igfs/IgfsUtils.java     |   11 +-
 .../offheap/GridOffHeapProcessor.java           |   17 +
 .../processors/resource/GridResourceField.java  |   11 +
 .../processors/resource/GridResourceIoc.java    |  387 +-
 .../processors/resource/GridResourceMethod.java |   13 +
 .../resource/GridResourceProcessor.java         |    4 +-
 .../ignite/internal/util/IgniteUtils.java       |   19 +-
 .../internal/util/future/SettableFuture.java    |   86 +
 .../util/lang/GridFilteredIterator.java         |    2 +-
 .../ignite/internal/util/lang/GridFunc.java     | 7218 +++++-------------
 .../util/offheap/GridOffHeapPartitionedMap.java |    9 +
 .../unsafe/GridUnsafePartitionedMap.java        |  155 +-
 .../internal/visor/query/VisorQueryArg.java     |   14 +-
 .../internal/visor/query/VisorQueryJob.java     |    2 +
 .../discovery/tcp/TcpClientDiscoverySpi.java    |  114 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  255 +-
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   |   39 +
 .../messages/TcpDiscoveryClientPingRequest.java |   56 +
 .../TcpDiscoveryClientPingResponse.java         |   67 +
 .../resources/META-INF/classnames.properties    |   12 +-
 .../core/src/main/resources/ignite.properties   |    2 +-
 .../internal/GridUpdateNotifierSelfTest.java    |   21 +-
 .../processors/cache/CacheGetFromJobTest.java   |  110 +
 .../GridCacheAbstractFailoverSelfTest.java      |   12 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  227 +-
 .../cache/GridCacheAbstractSelfTest.java        |    4 +-
 .../cache/OffHeapTieredTransactionSelfTest.java |  127 +
 .../GridCacheAbstractNodeRestartSelfTest.java   |  101 +-
 .../distributed/IgniteTxGetAfterStopTest.java   |  131 +
 ...xOriginatingNodeFailureAbstractSelfTest.java |    2 +-
 ...icOffHeapTieredMultiNodeFullApiSelfTest.java |   43 +
 ...ionedNearDisabledOffHeapFullApiSelfTest.java |    8 +-
 ...DisabledOffHeapMultiNodeFullApiSelfTest.java |    8 +-
 ...abledOffHeapTieredAtomicFullApiSelfTest.java |   56 +
 ...earDisabledOffHeapTieredFullApiSelfTest.java |   33 +
 ...edOffHeapTieredMultiNodeFullApiSelfTest.java |   33 +
 ...rDisabledPrimaryNodeFailureRecoveryTest.java |   31 +
 ...rtitionedPrimaryNodeFailureRecoveryTest.java |   31 +
 ...woBackupsPrimaryNodeFailureRecoveryTest.java |   37 +
 ...ePrimaryNodeFailureRecoveryAbstractTest.java |  533 ++
 ...idCacheAtomicReplicatedFailoverSelfTest.java |    6 +
 ...CacheAtomicOffHeapTieredFullApiSelfTest.java |   32 +
 ...icOffHeapTieredMultiNodeFullApiSelfTest.java |   33 +
 ...yWriteOrderOffHeapTieredFullApiSelfTest.java |   33 +
 ...erOffHeapTieredMultiNodeFullApiSelfTest.java |   33 +
 ...achePartitionedMultiNodeFullApiSelfTest.java |   15 +-
 .../GridCachePartitionedNodeRestartTest.java    |    4 +-
 ...dCachePartitionedOffHeapFullApiSelfTest.java |    8 +-
 ...titionedOffHeapMultiNodeFullApiSelfTest.java |    8 +-
 ...PartitionedOffHeapTieredFullApiSelfTest.java |   32 +
 ...edOffHeapTieredMultiNodeFullApiSelfTest.java |   72 +
 ...ePartitionedOptimisticTxNodeRestartTest.java |    4 +-
 .../GridCachePartitionedTxSalvageSelfTest.java  |   25 +-
 .../GridCacheReplicatedFailoverSelfTest.java    |    6 +
 .../GridCacheReplicatedNodeRestartSelfTest.java |   82 +
 ...idCacheReplicatedOffHeapFullApiSelfTest.java |    8 +-
 ...plicatedOffHeapMultiNodeFullApiSelfTest.java |    8 +-
 ...eReplicatedOffHeapTieredFullApiSelfTest.java |   33 +
 ...edOffHeapTieredMultiNodeFullApiSelfTest.java |   33 +
 .../IgniteCacheExpiryPolicyAbstractTest.java    |    2 +-
 .../IgniteCacheExpiryPolicyTestSuite.java       |    2 +
 .../expiry/IgniteCacheTtlCleanupSelfTest.java   |   85 +
 ...LocalAtomicOffHeapTieredFullApiSelfTest.java |   32 +
 .../GridCacheLocalOffHeapFullApiSelfTest.java   |    6 +-
 ...dCacheLocalOffHeapTieredFullApiSelfTest.java |   32 +
 .../igfs/IgfsClientCacheSelfTest.java           |  132 +
 .../processors/igfs/IgfsOneClientNodeTest.java  |  133 +
 .../processors/igfs/IgfsStreamsSelfTest.java    |    2 +-
 .../tcp/TcpClientDiscoverySelfTest.java         | 1021 ---
 .../TcpClientDiscoverySpiConfigSelfTest.java    |    2 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 1089 +++
 .../ignite/testsuites/IgniteBasicTestSuite.java |    1 +
 .../IgniteCacheFailoverTestSuite.java           |   10 +-
 .../IgniteCacheFullApiSelfTestSuite.java        |   18 +
 .../testsuites/IgniteCacheRestartTestSuite.java |   11 +-
 .../testsuites/IgniteCacheTestSuite3.java       |    5 +-
 .../IgniteCacheTxRecoverySelfTestSuite.java     |    4 +
 .../ignite/testsuites/IgniteIgfsTestSuite.java  |    3 +
 .../IgniteSpiDiscoverySelfTestSuite.java        |    2 +-
 modules/extdata/p2p/pom.xml                     |    2 +-
 modules/extdata/uri/pom.xml                     |    2 +-
 modules/gce/pom.xml                             |    4 +-
 modules/geospatial/pom.xml                      |    2 +-
 modules/hadoop/pom.xml                          |    2 +-
 modules/hibernate/pom.xml                       |    2 +-
 modules/indexing/pom.xml                        |    2 +-
 .../processors/query/h2/IgniteH2Indexing.java   |    4 +
 .../processors/query/h2/sql/GridSqlQuery.java   |   20 +
 .../query/h2/sql/GridSqlQueryParser.java        |   10 +-
 .../query/h2/sql/GridSqlQuerySplitter.java      |   11 +-
 .../processors/query/h2/sql/GridSqlSelect.java  |    2 +-
 .../processors/query/h2/sql/GridSqlUnion.java   |    2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |    3 +
 .../h2/twostep/GridReduceQueryExecutor.java     |  119 +-
 .../cache/GridCacheOffheapIndexGetSelfTest.java |  111 +
 .../IgniteCacheAbstractFieldsQuerySelfTest.java |   21 +
 ...eQueryMultiThreadedOffHeapTiredSelfTest.java |   37 +
 .../IgniteCacheQueryMultiThreadedSelfTest.java  |   29 +-
 .../IgniteCacheQuerySelfTestSuite.java          |    1 +
 .../IgniteCacheWithIndexingTestSuite.java       |    2 +
 modules/jcl/pom.xml                             |    2 +-
 modules/jta/pom.xml                             |    2 +-
 modules/log4j/pom.xml                           |    2 +-
 modules/rest-http/pom.xml                       |    2 +-
 modules/scalar/pom.xml                          |    2 +-
 .../ignite/scalar/ScalarConversions.scala       |    8 -
 modules/schedule/pom.xml                        |    2 +-
 modules/schema-import/pom.xml                   |    2 +-
 .../ignite/schema/generator/CodeGenerator.java  |   41 +-
 modules/slf4j/pom.xml                           |    2 +-
 modules/spring/pom.xml                          |    2 +-
 modules/ssh/pom.xml                             |    2 +-
 modules/tools/pom.xml                           |    2 +-
 modules/urideploy/pom.xml                       |    2 +-
 modules/visor-console/pom.xml                   |    2 +-
 .../commands/cache/VisorCacheScanCommand.scala  |    2 +-
 modules/visor-plugins/pom.xml                   |    2 +-
 modules/web/pom.xml                             |    2 +-
 modules/yardstick/pom.xml                       |    2 +-
 parent/pom.xml                                  |    2 +
 pom.xml                                         |  115 +-
 194 files changed, 9825 insertions(+), 10127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index ad78f86,53522c7..134097b
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@@ -4476,74 -4517,76 +4514,110 @@@ public class TcpDiscoverySpi extends Tc
          /**
           * @param msg Message.
           */
+         private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) {
+             utilityPool.execute(new Runnable() {
+                 @Override public void run() {
+                     boolean res = pingNode(msg.nodeToPing());
+ 
+                     final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId());
+ 
+                     if (worker == null) {
+                         if (log.isDebugEnabled())
+                             log.debug("Ping request from dead client node, will be skipped: " + msg.creatorNodeId());
+                     }
+                     else {
+                         TcpDiscoveryClientPingResponse pingRes = new TcpDiscoveryClientPingResponse(
+                             getLocalNodeId(), msg.nodeToPing(), res);
+ 
+                         pingRes.verify(getLocalNodeId());
+ 
+                         worker.addMessage(pingRes);
+                     }
+                 }
+             });
+         }
+ 
+         /**
+          * @param msg Message.
+          */
+         private void processPingResponse(final TcpDiscoveryPingResponse msg) {
+             ClientMessageWorker clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
+ 
+             if (clientWorker != null)
+                 clientWorker.pingResult(true);
+         }
+ 
+         /**
+          * @param msg Message.
+          */
          private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
              if (isLocalNodeCoordinator()) {
 -                if (msg.verified()) {
 +                boolean sndNext;
 +
 +                if (!msg.verified()) {
 +                    msg.verify(getLocalNodeId());
 +                    msg.topologyVersion(ring.topologyVersion());
 +
 +                    notifyDiscoveryListener(msg);
 +
 +                    sndNext = true;
 +                }
 +                else
 +                    sndNext = false;
 +
 +                if (sndNext && ring.hasRemoteNodes())
 +                    sendMessageAcrossRing(msg);
 +                else {
                      stats.onRingMessageReceived(msg);
  
 -                    addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
 +                    try {
 +                        DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
  
 -                    return;
 +                        DiscoverySpiCustomMessage nextMsg = msgObj.newMessageOnRingEnd();
 +
 +                        if (nextMsg != null)
 +                            addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(nextMsg)));
 +                    }
 +                    catch (IgniteCheckedException e) {
 +                        U.error(log, "Failed to unmarshal discovery custom message.", e);
 +                    }
 +
 +                    addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
                  }
 +            }
 +            else {
 +                if (msg.verified())
 +                    notifyDiscoveryListener(msg);
  
 -                msg.verify(getLocalNodeId());
 -                msg.topologyVersion(ring.topologyVersion());
 +                if (ring.hasRemoteNodes())
 +                    sendMessageAcrossRing(msg);
              }
 +        }
  
 -            if (msg.verified()) {
 -                DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
 +        /**
 +         * @param msg Custom message.
 +         */
 +        private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {
 +            DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
  
 -                TcpDiscoverySpiState spiState = spiStateCopy();
 +            TcpDiscoverySpiState spiState = spiStateCopy();
  
 -                Map<Long, Collection<ClusterNode>> hist;
 +            Map<Long, Collection<ClusterNode>> hist;
  
 -                synchronized (mux) {
 -                    hist = new TreeMap<>(topHist);
 -                }
 +            synchronized (mux) {
 +                hist = new TreeMap<>(topHist);
 +            }
  
 -                Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion());
 +            Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion());
  
 -                if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) {
 -                    assert msg.messageBytes() != null;
 +            if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) {
 +                assert msg.messageBytes() != null;
  
 -                    TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
 +                TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
  
 +                if (node != null) {
                      try {
 -                        Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
 +                        DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
  
                          lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
                              msg.topologyVersion(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 0000000,507b3e7..da40d4e
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@@ -1,0 -1,1089 +1,1089 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.spi.discovery.tcp;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.io.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.resources.*;
+ import org.apache.ignite.spi.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+ import org.apache.ignite.spi.discovery.tcp.messages.*;
+ import org.apache.ignite.testframework.*;
+ import org.apache.ignite.testframework.junits.common.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.net.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static java.util.concurrent.TimeUnit.*;
+ import static org.apache.ignite.events.EventType.*;
+ 
+ /**
+  * Client-based discovery tests.
+  */
+ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
+     /** */
+     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+ 
+     /** */
+     private static final AtomicInteger srvIdx = new AtomicInteger();
+ 
+     /** */
+     private static final AtomicInteger clientIdx = new AtomicInteger();
+ 
+     /** */
+     private static Collection<UUID> srvNodeIds;
+ 
+     /** */
+     private static Collection<UUID> clientNodeIds;
+ 
+     /** */
+     private static int clientsPerSrv;
+ 
+     /** */
+     private static CountDownLatch srvJoinedLatch;
+ 
+     /** */
+     private static CountDownLatch srvLeftLatch;
+ 
+     /** */
+     private static CountDownLatch srvFailedLatch;
+ 
+     /** */
+     private static CountDownLatch clientJoinedLatch;
+ 
+     /** */
+     private static CountDownLatch clientLeftLatch;
+ 
+     /** */
+     private static CountDownLatch clientFailedLatch;
+ 
+     /** */
+     private static CountDownLatch msgLatch;
+ 
+     /** */
+     private UUID nodeId;
+ 
+     /** */
+     private TcpDiscoveryVmIpFinder clientIpFinder;
+ 
+     /** */
+     private long joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
+ 
+     /** {@inheritDoc} */
+     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+         IgniteConfiguration cfg = super.getConfiguration(gridName);
+ 
+         cfg.setLocalHost("127.0.0.1");
+ 
+         if (gridName.startsWith("server")) {
+             TcpDiscoverySpi disco = new TcpDiscoverySpi();
+ 
+             disco.setIpFinder(IP_FINDER);
+ 
+             cfg.setDiscoverySpi(disco);
+         }
+         else if (gridName.startsWith("client")) {
+             TcpClientDiscoverySpi disco = new TestTcpClientDiscovery();
+ 
+             disco.setJoinTimeout(joinTimeout);
+ 
+             TcpDiscoveryVmIpFinder ipFinder;
+ 
+             if (clientIpFinder != null)
+                 ipFinder = clientIpFinder;
+             else {
+                 ipFinder = new TcpDiscoveryVmIpFinder();
+ 
+                 String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).
+                     get((clientIdx.get() - 1) / clientsPerSrv).toString();
+ 
+                 if (addr.startsWith("/"))
+                     addr = addr.substring(1);
+ 
+                 ipFinder.setAddresses(Arrays.asList(addr));
+             }
+ 
+             disco.setIpFinder(ipFinder);
+ 
+             cfg.setDiscoverySpi(disco);
+ 
+             String nodeId = cfg.getNodeId().toString();
+ 
+             nodeId = "cc" + nodeId.substring(2);
+ 
+             cfg.setNodeId(UUID.fromString(nodeId));
+         }
+ 
+         if (nodeId != null)
+             cfg.setNodeId(nodeId);
+ 
+         return cfg;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void beforeTest() throws Exception {
+         Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
+ 
+         if (!F.isEmpty(addrs))
+             IP_FINDER.unregisterAddresses(addrs);
+ 
+         srvIdx.set(0);
+         clientIdx.set(0);
+ 
+         srvNodeIds = new GridConcurrentHashSet<>();
+         clientNodeIds = new GridConcurrentHashSet<>();
+ 
+         clientsPerSrv = 2;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void afterTest() throws Exception {
+         stopAllClients(true);
+         stopAllServers(true);
+ 
+         nodeId = null;
+         clientIpFinder = null;
+         joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
+ 
+         assert G.allGrids().isEmpty();
+     }
+ 
+     /**
+      *
+      * @throws Exception
+      */
+     public void testJoinTimeout() throws Exception {
+         clientIpFinder = new TcpDiscoveryVmIpFinder();
+         joinTimeout = 1000;
+ 
+         try {
+             startClientNodes(1);
+ 
+             fail("Client cannot be start because no server nodes run");
+         }
+         catch (IgniteCheckedException e) {
+             IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+ 
+             assert spiEx != null : e;
+ 
+             assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testClientNodeJoin() throws Exception {
+         startServerNodes(3);
+         startClientNodes(3);
+ 
+         checkNodes(3, 3);
+ 
+         srvJoinedLatch = new CountDownLatch(3);
+         clientJoinedLatch = new CountDownLatch(3);
+ 
+         attachListeners(3, 3);
+ 
+         startClientNodes(1);
+ 
+         await(srvJoinedLatch);
+         await(clientJoinedLatch);
+ 
+         checkNodes(3, 4);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testClientNodeLeave() throws Exception {
+         startServerNodes(3);
+         startClientNodes(3);
+ 
+         checkNodes(3, 3);
+ 
+         srvLeftLatch = new CountDownLatch(3);
+         clientLeftLatch = new CountDownLatch(2);
+ 
+         attachListeners(3, 3);
+ 
+         stopGrid("client-2");
+ 
+         await(srvLeftLatch);
+         await(clientLeftLatch);
+ 
+         checkNodes(3, 2);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testClientNodeFail() throws Exception {
+         startServerNodes(3);
+         startClientNodes(3);
+ 
+         checkNodes(3, 3);
+ 
+         srvFailedLatch = new CountDownLatch(3);
+         clientFailedLatch = new CountDownLatch(2);
+ 
+         attachListeners(3, 3);
+ 
+         failClient(2);
+ 
+         await(srvFailedLatch);
+         await(clientFailedLatch);
+ 
+         checkNodes(3, 2);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testServerNodeJoin() throws Exception {
+         startServerNodes(3);
+         startClientNodes(3);
+ 
+         checkNodes(3, 3);
+ 
+         srvJoinedLatch = new CountDownLatch(3);
+         clientJoinedLatch = new CountDownLatch(3);
+ 
+         attachListeners(3, 3);
+ 
+         startServerNodes(1);
+ 
+         await(srvJoinedLatch);
+         await(clientJoinedLatch);
+ 
+         checkNodes(4, 3);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testServerNodeLeave() throws Exception {
+         startServerNodes(3);
+         startClientNodes(3);
+ 
+         checkNodes(3, 3);
+ 
+         srvLeftLatch = new CountDownLatch(2);
+         clientLeftLatch = new CountDownLatch(3);
+ 
+         attachListeners(3, 3);
+ 
+         stopGrid("server-2");
+ 
+         await(srvLeftLatch);
+         await(clientLeftLatch);
+ 
+         checkNodes(2, 3);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testServerNodeFail() throws Exception {
+         startServerNodes(3);
+         startClientNodes(3);
+ 
+         checkNodes(3, 3);
+ 
+         srvFailedLatch = new CountDownLatch(2);
+         clientFailedLatch = new CountDownLatch(3);
+ 
+         attachListeners(3, 3);
+ 
+         assert U.<Map>field(G.ignite("server-2").configuration().getDiscoverySpi(), "clientMsgWorkers").isEmpty();
+ 
+         failServer(2);
+ 
+         await(srvFailedLatch);
+         await(clientFailedLatch);
+ 
+         checkNodes(2, 3);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testPing() throws Exception {
+         startServerNodes(2);
+         startClientNodes(1);
+ 
+         Ignite srv0 = G.ignite("server-0");
+         Ignite srv1 = G.ignite("server-1");
+         Ignite client = G.ignite("client-0");
+ 
+         assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+         assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+ 
+         assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
+         assert ((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testPingFailedNodeFromClient() throws Exception {
+         startServerNodes(2);
+         startClientNodes(1);
+ 
+         Ignite srv0 = G.ignite("server-0");
+         Ignite srv1 = G.ignite("server-1");
+         Ignite client = G.ignite("client-0");
+ 
+         final CountDownLatch latch = new CountDownLatch(1);
+ 
+         ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
+             @Override public void apply(Socket sock) {
+                 try {
+                     latch.await();
+                 }
+                 catch (InterruptedException e) {
+                     throw new RuntimeException(e);
+                 }
+             }
+         });
+ 
+         assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
+         assert !((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
+ 
+         latch.countDown();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testPingFailedClientNode() throws Exception {
+         startServerNodes(2);
+         startClientNodes(1);
+ 
+         Ignite srv0 = G.ignite("server-0");
+         Ignite srv1 = G.ignite("server-1");
+         Ignite client = G.ignite("client-0");
+ 
+         ((TcpDiscoverySpiAdapter)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
+ 
+         ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).pauseSocketWrite();
+ 
+         assert !((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+         assert !((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+ 
+         ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).resumeAll();
+ 
+         assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+         assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testClientReconnectOnRouterFail() throws Exception {
+         clientsPerSrv = 1;
+ 
+         startServerNodes(3);
+         startClientNodes(3);
+ 
+         checkNodes(3, 3);
+ 
+         setClientRouter(2, 0);
+ 
+         srvFailedLatch = new CountDownLatch(2);
+         clientFailedLatch = new CountDownLatch(3);
+ 
+         attachListeners(2, 3);
+ 
+         failServer(2);
+ 
+         await(srvFailedLatch);
+         await(clientFailedLatch);
+ 
+         checkNodes(2, 3);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testClientReconnectOnNetworkProblem() throws Exception {
+         clientsPerSrv = 1;
+ 
+         startServerNodes(3);
+         startClientNodes(3);
+ 
+         checkNodes(3, 3);
+ 
+         setClientRouter(2, 0);
+ 
+         srvFailedLatch = new CountDownLatch(2);
+         clientFailedLatch = new CountDownLatch(3);
+ 
+         attachListeners(2, 3);
+ 
+         ((TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi()).brokeConnection();
+ 
+         G.ignite("client-2").message().remoteListen(null, new MessageListener()); // Send some discovery message.
+ 
+         checkNodes(3, 3);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testGetMissedMessagesOnReconnect() throws Exception {
+         clientsPerSrv = 1;
+ 
+         startServerNodes(3);
+         startClientNodes(2);
+ 
+         checkNodes(3, 2);
+ 
+         clientLeftLatch = new CountDownLatch(1);
+         srvLeftLatch = new CountDownLatch(2);
+ 
+         attachListeners(2, 2);
+ 
+         ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll();
+ 
+         stopGrid("server-2");
+ 
+         await(srvLeftLatch);
+         await(srvLeftLatch);
+ 
+         Thread.sleep(500);
+ 
+         assert G.ignite("client-0").cluster().nodes().size() == 4;
+         assert G.ignite("client-1").cluster().nodes().size() == 5;
+ 
+         clientLeftLatch = new CountDownLatch(1);
+ 
+         ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resumeAll();
+ 
+         await(clientLeftLatch);
+ 
+         checkNodes(2, 2);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testClientSegmentation() throws Exception {
+         clientsPerSrv = 1;
+ 
+         startServerNodes(3);
+         startClientNodes(3);
+ 
+         checkNodes(3, 3);
+ 
+ //        setClientRouter(2, 2);
+ 
+         srvFailedLatch = new CountDownLatch(2 + 2);
+         clientFailedLatch = new CountDownLatch(2 + 2);
+ 
+         attachListeners(2, 2);
+ 
+         final CountDownLatch client2StoppedLatch = new CountDownLatch(1);
+ 
+         IgnitionListener lsnr = new IgnitionListener() {
+             @Override public void onStateChange(@Nullable String name, IgniteState state) {
+                 if (state == IgniteState.STOPPED_ON_SEGMENTATION)
+                     client2StoppedLatch.countDown();
+             }
+         };
+         G.addListener(lsnr);
+ 
+         try {
+             failServer(2);
+ 
+             await(srvFailedLatch);
+             await(clientFailedLatch);
+ 
+             await(client2StoppedLatch);
+ 
+             checkNodes(2, 2);
+         }
+         finally {
+             G.removeListener(lsnr);
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testClientNodeJoinOneServer() throws Exception {
+         startServerNodes(1);
+ 
+         srvJoinedLatch = new CountDownLatch(1);
+ 
+         attachListeners(1, 0);
+ 
+         startClientNodes(1);
+ 
+         await(srvJoinedLatch);
+ 
+         checkNodes(1, 1);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testClientNodeLeaveOneServer() throws Exception {
+         startServerNodes(1);
+         startClientNodes(1);
+ 
+         checkNodes(1, 1);
+ 
+         srvLeftLatch = new CountDownLatch(1);
+ 
+         attachListeners(1, 0);
+ 
+         stopGrid("client-0");
+ 
+         await(srvLeftLatch);
+ 
+         checkNodes(1, 0);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testClientNodeFailOneServer() throws Exception {
+         startServerNodes(1);
+         startClientNodes(1);
+ 
+         checkNodes(1, 1);
+ 
+         srvFailedLatch = new CountDownLatch(1);
+ 
+         attachListeners(1, 0);
+ 
+         failClient(0);
+ 
+         await(srvFailedLatch);
+ 
+         checkNodes(1, 0);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testMetrics() throws Exception {
+         startServerNodes(3);
+         startClientNodes(3);
+ 
+         checkNodes(3, 3);
+ 
+         attachListeners(3, 3);
+ 
+         assertTrue(checkMetrics(3, 3, 0));
+ 
+         G.ignite("client-0").compute().broadcast(F.noop());
+ 
+         assertTrue(GridTestUtils.waitForCondition(new PA() {
+             @Override public boolean apply() {
+                 return checkMetrics(3, 3, 1);
+             }
+         }, 10000));
+ 
+         checkMetrics(3, 3, 1);
+ 
+         G.ignite("server-0").compute().broadcast(F.noop());
+ 
+         assertTrue(GridTestUtils.waitForCondition(new PA() {
+             @Override public boolean apply() {
+                 return checkMetrics(3, 3, 2);
+             }
+         }, 10000));
+     }
+ 
+     /**
+      * @param srvCnt Number of Number of server nodes.
+      * @param clientCnt Number of client nodes.
+      * @param execJobsCnt Expected number of executed jobs.
+      * @return Whether metrics are correct.
+      */
+     private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) {
+         for (int i = 0; i < srvCnt; i++) {
+             Ignite g = G.ignite("server-" + i);
+ 
+             for (ClusterNode n : g.cluster().nodes()) {
+                 if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
+                     return false;
+             }
+         }
+ 
+         for (int i = 0; i < clientCnt; i++) {
+             Ignite g = G.ignite("client-" + i);
+ 
+             for (ClusterNode n : g.cluster().nodes()) {
+                 if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
+                     return false;
+             }
+         }
+ 
+         return true;
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testDataExchangeFromServer() throws Exception {
+         testDataExchange("server-0");
+     }
+ 
+     /**
+      * TODO: IGNITE-587.
+      *
+      * @throws Exception If failed.
+      */
+     public void testDataExchangeFromClient() throws Exception {
+         testDataExchange("client-0");
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     private void testDataExchange(String masterName) throws Exception {
+         startServerNodes(2);
+         startClientNodes(2);
+ 
+         checkNodes(2, 2);
+ 
+         IgniteMessaging msg = grid(masterName).message();
+ 
+         UUID id = null;
+ 
+         try {
+             id = msg.remoteListen(null, new MessageListener());
+ 
 -            msgLatch = new CountDownLatch(4);
++            msgLatch = new CountDownLatch(2);
+ 
+             msg.send(null, "Message 1");
+ 
+             await(msgLatch);
+ 
+             startServerNodes(1);
+             startClientNodes(1);
+ 
+             checkNodes(3, 3);
+ 
 -            msgLatch = new CountDownLatch(6);
++            msgLatch = new CountDownLatch(3);
+ 
+             msg.send(null, "Message 2");
+ 
+             await(msgLatch);
+         }
+         finally {
+             if (id != null)
+                 msg.stopRemoteListen(id);
+         }
+     }
+ 
+     /**
+      * @throws Exception If any error occurs.
+      */
+     public void testDuplicateId() throws Exception {
+         startServerNodes(2);
+ 
+         nodeId = G.ignite("server-1").cluster().localNode().id();
+ 
+         try {
+             startGrid("client-0");
+ 
+             assert false;
+         }
+         catch (IgniteCheckedException e) {
+             IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+ 
+             assert spiEx != null : e;
+             assert spiEx.getMessage().contains("same ID") : spiEx.getMessage();
+         }
+     }
+ 
+     /**
+      * @throws Exception If any error occurs.
+      */
+     public void testTimeoutWaitingNodeAddedMessage() throws Exception {
+         startServerNodes(2);
+ 
+         final CountDownLatch cnt = new CountDownLatch(1);
+ 
+         ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(
+             new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
+                 @Override public void apply(TcpDiscoveryAbstractMessage msg) {
+                     try {
+                         cnt.await(10, MINUTES);
+                     }
+                     catch (InterruptedException e) {
+                         Thread.currentThread().interrupt();
+ 
+                         throw new IgniteInterruptedException(e);
+                     }
+                 }
+             });
+ 
+         try {
+             startGrid("client-0");
+ 
+             assert false;
+         }
+         catch (IgniteCheckedException e) {
+             cnt.countDown();
+ 
+             IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+ 
+             assert spiEx != null : e;
+             assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
+         }
+     }
+ 
+     /**
+      * @throws Exception If any error occurs.
+      */
+     public void testGridStartTime() throws Exception {
+         startServerNodes(2);
+ 
+         startClientNodes(2);
+ 
+         long startTime = -1;
+ 
+         for (Ignite g : G.allGrids()) {
+             IgniteEx kernal = (IgniteKernal)g;
+ 
+             assertTrue(kernal.context().discovery().gridStartTime() > 0);
+ 
+             if (startTime == -1)
+                 startTime = kernal.context().discovery().gridStartTime();
+             else
+                 assertEquals(startTime, kernal.context().discovery().gridStartTime());
+         }
+     }
+ 
+     /**
+      * @param clientIdx Index.
+      * @throws Exception In case of error.
+      */
+     private void setClientRouter(int clientIdx, int srvIdx) throws Exception {
+         TcpClientDiscoverySpi disco =
+             (TcpClientDiscoverySpi)G.ignite("client-" + clientIdx).configuration().getDiscoverySpi();
+ 
+         TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder();
+ 
+         String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).get(srvIdx).toString();
+ 
+         if (addr.startsWith("/"))
+             addr = addr.substring(1);
+ 
+         ipFinder.setAddresses(Arrays.asList(addr));
+     }
+ 
+     /**
+      * @param cnt Number of nodes.
+      * @throws Exception In case of error.
+      */
+     private void startServerNodes(int cnt) throws Exception {
+         for (int i = 0; i < cnt; i++) {
+             Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+ 
+             srvNodeIds.add(g.cluster().localNode().id());
+         }
+     }
+ 
+     /**
+      * @param cnt Number of nodes.
+      * @throws Exception In case of error.
+      */
+     private void startClientNodes(int cnt) throws Exception {
+         for (int i = 0; i < cnt; i++) {
+             Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
+ 
+             clientNodeIds.add(g.cluster().localNode().id());
+         }
+     }
+ 
+     /**
+      * @param idx Index.
+      */
+     private void failServer(int idx) {
+         ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
+     }
+ 
+     /**
+      * @param idx Index.
+      */
+     private void failClient(int idx) {
+         ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
+     }
+ 
+     /**
+      * @param srvCnt Number of server nodes.
+      * @param clientCnt Number of client nodes.
+      */
+     private void attachListeners(int srvCnt, int clientCnt) throws Exception {
+         if (srvJoinedLatch != null) {
+             for (int i = 0; i < srvCnt; i++) {
+                 G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+                     @Override public boolean apply(Event evt) {
+                         info("Joined event fired on server: " + evt);
+ 
+                         srvJoinedLatch.countDown();
+ 
+                         return true;
+                     }
+                 }, EVT_NODE_JOINED);
+             }
+         }
+ 
+         if (srvLeftLatch != null) {
+             for (int i = 0; i < srvCnt; i++) {
+                 G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+                     @Override public boolean apply(Event evt) {
+                         info("Left event fired on server: " + evt);
+ 
+                         srvLeftLatch.countDown();
+ 
+                         return true;
+                     }
+                 }, EVT_NODE_LEFT);
+             }
+         }
+ 
+         if (srvFailedLatch != null) {
+             for (int i = 0; i < srvCnt; i++) {
+                 G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+                     @Override public boolean apply(Event evt) {
+                         info("Failed event fired on server: " + evt);
+ 
+                         srvFailedLatch.countDown();
+ 
+                         return true;
+                     }
+                 }, EVT_NODE_FAILED);
+             }
+         }
+ 
+         if (clientJoinedLatch != null) {
+             for (int i = 0; i < clientCnt; i++) {
+                 G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+                     @Override public boolean apply(Event evt) {
+                         info("Joined event fired on client: " + evt);
+ 
+                         clientJoinedLatch.countDown();
+ 
+                         return true;
+                     }
+                 }, EVT_NODE_JOINED);
+             }
+         }
+ 
+         if (clientLeftLatch != null) {
+             for (int i = 0; i < clientCnt; i++) {
+                 G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+                     @Override public boolean apply(Event evt) {
+                         info("Left event fired on client: " + evt);
+ 
+                         clientLeftLatch.countDown();
+ 
+                         return true;
+                     }
+                 }, EVT_NODE_LEFT);
+             }
+         }
+ 
+         if (clientFailedLatch != null) {
+             for (int i = 0; i < clientCnt; i++) {
+                 G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+                     @Override public boolean apply(Event evt) {
+                         info("Failed event fired on client: " + evt);
+ 
+                         clientFailedLatch.countDown();
+ 
+                         return true;
+                     }
+                 }, EVT_NODE_FAILED);
+             }
+         }
+     }
+ 
+     /**
+      * @param srvCnt Number of server nodes.
+      * @param clientCnt Number of client nodes.
+      */
+     private void checkNodes(int srvCnt, int clientCnt) {
+         for (int i = 0; i < srvCnt; i++) {
+             Ignite g = G.ignite("server-" + i);
+ 
+             assertTrue(srvNodeIds.contains(g.cluster().localNode().id()));
+ 
+             assertFalse(g.cluster().localNode().isClient());
+ 
+             checkRemoteNodes(g, srvCnt + clientCnt - 1);
+         }
+ 
+         for (int i = 0; i < clientCnt; i++) {
+             Ignite g = G.ignite("client-" + i);
+ 
+             assertTrue(clientNodeIds.contains(g.cluster().localNode().id()));
+ 
+             assertTrue(g.cluster().localNode().isClient());
+ 
+             checkRemoteNodes(g, srvCnt + clientCnt - 1);
+         }
+     }
+ 
+     /**
+      * @param ignite Grid.
+      * @param expCnt Expected nodes count.
+      */
+     @SuppressWarnings("TypeMayBeWeakened")
+     private void checkRemoteNodes(Ignite ignite, int expCnt) {
+         Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
+ 
+         assertEquals(expCnt, nodes.size());
+ 
+         for (ClusterNode node : nodes) {
+             UUID id = node.id();
+ 
+             if (clientNodeIds.contains(id))
+                 assertTrue(node.isClient());
+             else if (srvNodeIds.contains(id))
+                 assertFalse(node.isClient());
+             else
+                 assert false : "Unexpected node ID: " + id;
+         }
+     }
+ 
+     /**
+      * @param latch Latch.
+      * @throws InterruptedException If interrupted.
+      */
+     private void await(CountDownLatch latch) throws InterruptedException {
+         assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
+     }
+ 
+     /**
+      */
+     private static class MessageListener implements IgniteBiPredicate<UUID, Object> {
+         @IgniteInstanceResource
+         private Ignite ignite;
+ 
+         /** {@inheritDoc} */
+         @Override public boolean apply(UUID uuid, Object msg) {
+             X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
+ 
+             msgLatch.countDown();
+ 
+             return true;
+         }
+     }
+ 
+     /**
+      *
+      */
+     private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi {
+         /** */
+         private final Object mux = new Object();
+ 
+         /** */
+         private final AtomicBoolean writeLock = new AtomicBoolean();
+ 
+         /** */
+         private final AtomicBoolean openSockLock = new AtomicBoolean();
+ 
+         /**
+          * @param lock Lock.
+          */
+         private void waitFor(AtomicBoolean lock) {
+             try {
+                 synchronized (mux) {
+                     while (lock.get())
+                         mux.wait();
+                 }
+             }
+             catch (InterruptedException e) {
+                 Thread.currentThread().interrupt();
+ 
+                 throw new RuntimeException(e);
+             }
+         }
+ 
+         /**
+          * @param isPause Is lock.
+          * @param locks Locks.
+          */
+         private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) {
+             synchronized (mux) {
+                 for (AtomicBoolean lock : locks)
+                     lock.set(isPause);
+ 
+                 mux.notifyAll();
+             }
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+             GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
+             waitFor(writeLock);
+ 
+             super.writeToSocket(sock, msg, bout);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+             waitFor(openSockLock);
+ 
+             return super.openSocket(sockAddr);
+         }
+ 
+         /**
+          *
+          */
+         public void pauseSocketWrite() {
+             pauseResumeOperation(true, writeLock);
+         }
+ 
+         /**
+          *
+          */
+         public void pauseAll() {
+             pauseResumeOperation(true, openSockLock, writeLock);
+ 
+             brokeConnection();
+         }
+ 
+         /**
+          *
+          */
+         public void resumeAll() {
+             pauseResumeOperation(false, openSockLock, writeLock);
+         }
+     }
+ }


Mime
View raw message