ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [12/13] ignite git commit: IGNITE-3153 TcpDiscoveryZookeeperIpFinder doesn't properly handle client reconnections
Date Thu, 16 Jun 2016 11:31:19 GMT
IGNITE-3153 TcpDiscoveryZookeeperIpFinder doesn't properly handle client reconnections


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e05c012f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e05c012f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e05c012f

Branch: refs/heads/master
Commit: e05c012f293dcad2cd3a184e6d257a0cb2af509a
Parents: 40d41c1
Author: Anton Vinogradov <av@apache.org>
Authored: Thu Jun 16 10:49:48 2016 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Thu Jun 16 14:27:22 2016 +0300

----------------------------------------------------------------------
 .../zk/TcpDiscoveryZookeeperIpFinder.java       | 65 +++++++++++---------
 .../tcp/ipfinder/zk/ZookeeperIpFinderTest.java  | 16 ++---
 2 files changed, 42 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e05c012f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
index bee4dab..238987b 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
@@ -39,6 +39,7 @@ import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;
@@ -68,11 +69,8 @@ import org.codehaus.jackson.map.annotate.JsonRootName;
  *
  * @see <a href="http://zookeeper.apache.org">Apache ZooKeeper</a>
  * @see <a href="http://curator.apache.org">Apache Curator</a>
- *
- * @author Raul Kripalani
  */
 public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
-
     /** System property name to provide the ZK Connection String. */
     public static final String PROP_ZK_CONNECTION_STRING = "IGNITE_ZK_CONNECTION_STRING";
 
@@ -89,6 +87,10 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter
{
     @GridToStringExclude
     private final AtomicBoolean initGuard = new AtomicBoolean();
 
+    /** Init guard. */
+    @GridToStringExclude
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
     /** Logger. */
     @LoggerResource
     private IgniteLogger log;
@@ -140,9 +142,11 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter
{
             curator = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy);
         }
 
-        if (curator.getState() != CuratorFrameworkState.STARTED)
+        if (curator.getState() == CuratorFrameworkState.LATENT)
             curator.start();
 
+        A.ensure(curator.getState() == CuratorFrameworkState.STARTED, "CuratorFramework can't
be started.");
+
         discovery = ServiceDiscoveryBuilder.builder(IgniteInstanceDetails.class)
             .client(curator)
             .basePath(basePath)
@@ -152,8 +156,11 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter
{
 
     /** {@inheritDoc} */
     @Override public void onSpiContextDestroyed() {
-        if (!initGuard.compareAndSet(true, false))
+        if (!closeGuard.compareAndSet(false, true)) {
+            U.warn(log, "ZooKeeper IP Finder can't be closed more than once.");
+
             return;
+        }
 
         log.info("Destroying ZooKeeper IP Finder.");
 
@@ -175,7 +182,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter
{
 
         try {
             serviceInstances = discovery.queryForInstances(serviceName);
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.warning("Error while getting registered addresses from ZooKeeper IP Finder.",
e);
             return Collections.emptyList();
         }
@@ -214,17 +222,18 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter
{
 
             try {
                 ServiceInstance<IgniteInstanceDetails> si = ServiceInstance.<IgniteInstanceDetails>builder()
-                        .name(serviceName)
-                        .uriSpec(URI_SPEC)
-                        .address(addr.getAddress().getHostAddress())
-                        .port(addr.getPort())
-                        .build();
+                    .name(serviceName)
+                    .uriSpec(URI_SPEC)
+                    .address(addr.getAddress().getHostAddress())
+                    .port(addr.getPort())
+                    .build();
 
                 ourInstances.put(addr, si);
 
                 discovery.registerService(si);
 
-            } catch (Exception e) {
+            }
+            catch (Exception e) {
                 log.warning(String.format("Error while registering an address from ZooKeeper
IP Finder " +
                     "[message=%s,addresses=%s]", e.getMessage(), addr), e);
             }
@@ -245,13 +254,14 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter
{
             ServiceInstance<IgniteInstanceDetails> si = ourInstances.get(addr);
             if (si == null) {
                 log.warning("Asked to unregister address from ZooKeeper IP Finder, but no
match was found in local " +
-                        "instance map for: " + addrs);
+                    "instance map for: " + addrs);
                 continue;
             }
 
             try {
                 discovery.unregisterService(si);
-            } catch (Exception e) {
+            }
+            catch (Exception e) {
                 log.warning("Error while unregistering an address from ZooKeeper IP Finder:
" + addr, e);
             }
         }
@@ -272,7 +282,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter
{
     }
 
     /**
-     * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework}
is not being set explicitly.
+     * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework}
is not being set
+     * explicitly.
      */
     public void setZkConnectionString(String zkConnectionString) {
         this.zkConnectionString = zkConnectionString;
@@ -286,8 +297,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter
{
     }
 
     /**
-     * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being
injected, or if
-     *                    using a system property.
+     * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being
injected, or if using a
+     * system property.
      */
     public void setRetryPolicy(RetryPolicy retryPolicy) {
         this.retryPolicy = retryPolicy;
@@ -315,9 +326,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter
{
     }
 
     /**
-     * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery}
recipe. In physical
-     *                    ZK terms, it represents the node under {@link #basePath}, under
which services will be
-     *                    registered.
+     * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery}
recipe. In physical ZK
+     * terms, it represents the node under {@link #basePath}, under which services will be
registered.
      */
     public void setServiceName(String serviceName) {
         this.serviceName = serviceName;
@@ -331,20 +341,19 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter
{
     }
 
     /**
-     * @param allowDuplicateRegistrations Whether to register each node only once, or if
duplicate registrations
-     *                                    are allowed. Nodes will attempt to register themselves,
plus those they
-     *                                    know about. By default, duplicate registrations
are not allowed, but you
-     *                                    might want to set this property to <tt>true</tt>
if you have multiple
-     *                                    network interfaces or if you are facing troubles.
+     * @param allowDuplicateRegistrations Whether to register each node only once, or if
duplicate registrations are
+     * allowed. Nodes will attempt to register themselves, plus those they know about. By
default, duplicate
+     * registrations are not allowed, but you might want to set this property to <tt>true</tt>
if you have multiple
+     * network interfaces or if you are facing troubles.
      */
     public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
         this.allowDuplicateRegistrations = allowDuplicateRegistrations;
     }
 
     /**
-     * Empty DTO for storing service instances details. Currently acting as a placeholder
because Curator requires
-     * a payload type when registering and discovering nodes. May be enhanced in the future
with further information
-     * to assist discovery.
+     * Empty DTO for storing service instances details. Currently acting as a placeholder
because Curator requires a
+     * payload type when registering and discovering nodes. May be enhanced in the future
with further information to
+     * assist discovery.
      *
      * @author Raul Kripalani
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e05c012f/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
index 3f9b1ff..601323c 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
@@ -122,6 +122,7 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
         // shall be configured through system property
         if (gridName.equals(getTestGridName(0)))
             zkIpFinder.setZkConnectionString(zkCluster.getConnectString());
+
         else if (gridName.equals(getTestGridName(1))) {
             zkIpFinder.setCurator(CuratorFrameworkFactory.newClient(zkCluster.getConnectString(),
                 new ExponentialBackoffRetry(100, 5)));
@@ -360,23 +361,16 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
         // stop all grids
         stopAllGrids();
 
-        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 try {
-                    return zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size()
== 0;
+                    return 0 == zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size();
                 }
                 catch (Exception e) {
-                    fail("Unexpected error: ");
-
-                    return true;
+                    return false;
                 }
             }
-        }, 2 * 60000);
-
-        assertTrue(wait);
-
-        // check that all nodes are gone in ZK
-        assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+        }, 20000));
     }
 
     /**


Mime
View raw message