qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject svn commit: r1617322 - in /qpid/trunk/qpid/java/bdbstore: src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/ src/test/java/org/apache/qpid/server/store/berkeleydb/replicat...
Date Mon, 11 Aug 2014 16:20:41 GMT
Author: orudyy
Date: Mon Aug 11 16:20:41 2014
New Revision: 1617322

URL: http://svn.apache.org/r1617322
Log:
QPID-5967: Set permitted nodes on a replica from application state of a master node

Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
    qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1617322&r1=1617321&r2=1617322&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Mon Aug 11 16:20:41 2014
@@ -1094,15 +1094,6 @@ public class ReplicatedEnvironmentFacade
         return environment;
     }
 
-    NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
-    {
-        if (repNode == null)
-        {
-            throw new IllegalArgumentException("Node cannot be null");
-        }
-        return new DbPing(repNode, (String)_configuration.getGroupName(), DB_PING_SOCKET_TIMEOUT).getNodeState();
-    }
-
     public int getNumberOfElectableGroupMembers()
     {
         if (_state.get() != State.OPEN)
@@ -1181,6 +1172,105 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
+    Set<String> getPermittedNodes()
+    {
+        return Collections.unmodifiableSet(_permittedNodes);
+    }
+
+    public static NodeState getRemoteNodeState(String groupName, ReplicationNode repNode)
throws IOException, ServiceConnectFailedException
+    {
+        if (repNode == null)
+        {
+            throw new IllegalArgumentException("Node cannot be null");
+        }
+        return new DbPing(repNode, groupName, DB_PING_SOCKET_TIMEOUT).getNodeState();
+    }
+
+    public static Set<String> convertApplicationStateBytesToPermittedNodeList(byte[]
applicationState)
+    {
+        if (applicationState == null || applicationState.length == 0)
+        {
+            return Collections.emptySet();
+        }
+
+        ObjectMapper objectMapper = new ObjectMapper();
+        try
+        {
+            Map<String, Object> settings = objectMapper.readValue(applicationState,
Map.class);
+            return new HashSet<String>((Collection<String>)settings.get(PERMITTED_NODE_LIST));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException("Unexpected exception on de-serializing of application
state", e);
+        }
+    }
+
+    public static void connectToHelperNodeAndCheckPermittedHosts(String nodeName, String
hostPort, String groupName, String helperNodeName, String helperHostPort)
+    {
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug(String.format("Requesting state of the node '%s' at '%s'", helperNodeName,
helperHostPort));
+        }
+
+        if (helperNodeName == null || "".equals(helperNodeName))
+        {
+            throw new IllegalConfigurationException(String.format("A helper node is not specified
for node '%s'"
+                    + " joining the group '%s'", nodeName, groupName));
+        }
+
+        Collection<String> permittedNodes = null;
+        try
+        {
+            ReplicationNodeImpl node = new ReplicationNodeImpl(helperNodeName, helperHostPort);
+            NodeState state = getRemoteNodeState(groupName, node);
+            byte[] applicationState = state.getAppState();
+            permittedNodes = convertApplicationStateBytesToPermittedNodeList(applicationState);
+        }
+        catch (IOException e)
+        {
+            throw new IllegalConfigurationException(String.format("Cannot connect to '%s'",
helperHostPort), e);
+        }
+        catch (ServiceConnectFailedException e)
+        {
+            throw new IllegalConfigurationException(String.format("Failure to connect to
'%s'", helperHostPort), e);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(String.format("Unexpected exception on attempt to
retrieve state from '%s' at '%s'",
+                    helperNodeName, helperHostPort), e);
+        }
+
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to
'%s'", helperNodeName, String.valueOf(permittedNodes)));
+        }
+
+        if (permittedNodes==null || !permittedNodes.contains(hostPort))
+        {
+            throw new IllegalConfigurationException(String.format("Node from '%s' is not
permitted!", hostPort));
+        }
+    }
+
+    private void findMasterNodeStateAndApplyPermittedNodes(Collection<NodeState> nodeStates)
+    {
+        if (ReplicatedEnvironment.State.MASTER != _environment.getState())
+        {
+            for (NodeState nodeState : nodeStates)
+            {
+                if (nodeState.getNodeState() == ReplicatedEnvironment.State.MASTER)
+                {
+                    byte[] applicationState = nodeState.getAppState();
+                    Set<String> permittedNodes = convertApplicationStateBytesToPermittedNodeList(applicationState);
+                    if (!_permittedNodes.equals(permittedNodes))
+                    {
+                        setPermittedNodes(permittedNodes);
+                    }
+                    break;
+                }
+            }
+        }
+    }
+
     private void registerAppStateMonitorIfPermittedNodesSpecified()
     {
         if (!_permittedNodes.isEmpty())
@@ -1286,8 +1376,9 @@ public class ReplicatedEnvironmentFacade
                     executeDatabasePingerOnNodeChangesIfMaster(nodeStates);
 
                     notifyGroupListenerAboutNodeStates(nodeStates);
-                }
 
+                    findMasterNodeStateAndApplyPermittedNodes(nodeStates.values());
+                }
             }
             finally
             {
@@ -1384,7 +1475,7 @@ public class ReplicatedEnvironmentFacade
                         NodeState nodeStateObject = null;
                         try
                         {
-                            nodeStateObject = getRemoteNodeState(node);
+                            nodeStateObject = getRemoteNodeState((String)_configuration.getGroupName(),
node);
                         }
                         catch (IOException | ServiceConnectFailedException e )
                         {

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1617322&r1=1617321&r2=1617322&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
Mon Aug 11 16:20:41 2014
@@ -20,11 +20,9 @@
  */
 package org.apache.qpid.server.virtualhostnode.berkeleydb;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -43,10 +41,8 @@ import com.sleepycat.je.rep.ReplicatedEn
 import com.sleepycat.je.rep.ReplicationNode;
 import com.sleepycat.je.rep.StateChangeEvent;
 import com.sleepycat.je.rep.StateChangeListener;
-import com.sleepycat.je.rep.util.DbPing;
 import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
 import com.sleepycat.je.rep.utilint.HostPortPair;
-import com.sleepycat.je.rep.utilint.ServiceDispatcher;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -73,7 +69,6 @@ import org.apache.qpid.server.store.berk
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
 import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
-import org.codehaus.jackson.map.ObjectMapper;
 
 @ManagedObject( category = false, type = BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE
)
 public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl>
implements
@@ -263,7 +258,7 @@ public class BDBHAVirtualHostNodeImpl ex
         {
             try
             {
-                connectToHelperNodeAndCheckPermittedHosts(getHelperNodeName(), getHelperAddress(),
getAddress());
+                ReplicatedEnvironmentFacade.connectToHelperNodeAndCheckPermittedHosts(getName(),
getAddress(), getGroupName(), getHelperNodeName(), getHelperAddress());
             }
             catch(IllegalConfigurationException e)
             {
@@ -706,71 +701,6 @@ public class BDBHAVirtualHostNodeImpl ex
         return getAddress().equals(getHelperAddress());
     }
 
-    private void connectToHelperNodeAndCheckPermittedHosts(String helperNodeName, String
helperHostPort, String hostPort)
-    {
-        if (LOGGER.isDebugEnabled())
-        {
-            LOGGER.debug(String.format("Requesting state of the node '%s' at '%s'", helperNodeName,
helperHostPort));
-        }
-
-        if (_helperNodeName == null || "".equals(_helperNodeName))
-        {
-            throw new IllegalConfigurationException(String.format("An attribute '%s' is not
set in node '%s'"
-                    + " on joining the group '%s'", HELPER_NODE_NAME, getName(), getGroupName()));
-        }
-
-        Collection<String> permittedNodes = null;
-        try
-        {
-            ReplicatedEnvironmentFacade.ReplicationNodeImpl node = new ReplicatedEnvironmentFacade.ReplicationNodeImpl(helperNodeName,
helperHostPort);
-            NodeState state = new DbPing(node, getGroupName(), ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT).getNodeState();
-            byte[] applicationState = state.getAppState();
-            permittedNodes = bytesToPermittedNodeList(applicationState);
-        }
-        catch (IOException e)
-        {
-            throw new IllegalConfigurationException(String.format("Cannot connect to '%s'",
helperHostPort), e);
-        }
-        catch (ServiceDispatcher.ServiceConnectFailedException e)
-        {
-            throw new IllegalConfigurationException(String.format("Failure to connect to
'%s'", helperHostPort), e);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(String.format("Unexpected exception on attempt to
retrieve state from '%s' at '%s'",
-                    helperNodeName, helperHostPort), e);
-        }
-
-        if (LOGGER.isDebugEnabled())
-        {
-            LOGGER.debug(String.format("Attribute 'permittedNodes' on node '%s' is set to
'%s'", helperNodeName, String.valueOf(permittedNodes)));
-        }
-
-        if (permittedNodes != null && !permittedNodes.isEmpty() && !permittedNodes.contains(hostPort))
-        {
-            throw new IllegalConfigurationException(String.format("Node from '%s' is not
permitted!", hostPort));
-        }
-    }
-
-    private Collection<String> bytesToPermittedNodeList(byte[] applicationState)
-    {
-        if (applicationState == null || applicationState.length == 0)
-        {
-            return Collections.emptySet();
-        }
-
-        ObjectMapper objectMapper = new ObjectMapper();
-        try
-        {
-            Map<String, Object> settings = objectMapper.readValue(applicationState,
Map.class);
-            return (Collection<String>)settings.get(ReplicatedEnvironmentFacade.PERMITTED_NODE_LIST);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException("Unexpected exception on de-serializing of application
state", e);
-        }
-    }
-
     private class RemoteNodesDiscoverer implements ReplicationGroupListener
     {
         @Override

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1617322&r1=1617321&r2=1617322&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
Mon Aug 11 16:20:41 2014
@@ -656,7 +656,8 @@ public class ReplicatedEnvironmentFacade
         permittedNodes.add("localhost:" + getNextAvailable(TEST_NODE_PORT + 1));
         firstNode.setPermittedNodes(permittedNodes);
 
-        NodeState nodeState = firstNode.getRemoteNodeState(new ReplicatedEnvironmentFacade.ReplicationNodeImpl(TEST_NODE_NAME,
TEST_NODE_HOST_PORT));
+        ReplicatedEnvironmentFacade.ReplicationNodeImpl replicationNode = new ReplicatedEnvironmentFacade.ReplicationNodeImpl(TEST_NODE_NAME,
TEST_NODE_HOST_PORT);
+        NodeState nodeState = ReplicatedEnvironmentFacade.getRemoteNodeState(TEST_GROUP_NAME,
replicationNode);
 
         ObjectMapper objectMapper = new ObjectMapper();
 
@@ -708,10 +709,52 @@ public class ReplicatedEnvironmentFacade
         firstNode.setPermittedNodes(permittedNodes);
 
         String nodeName = TEST_NODE_NAME + "_1";
+        createIntruder(nodeName, node1NodeHostPort);
+        assertTrue("Intruder node was not detected", intruderLatch.await(10, TimeUnit.SECONDS));
+    }
+
+    public void testIntruderNodeDetectionOnMasterAndReplicaNodes() throws Exception
+    {
+        final CountDownLatch intruderLatch = new CountDownLatch(2);
+        ReplicationGroupListener listener = new NoopReplicationGroupListener()
+        {
+            @Override
+            public void onIntruderNode(ReplicationNode node)
+            {
+                intruderLatch.countDown();
+            }
+        };
+
+        ReplicatedEnvironmentFacade firstNode = createMaster(listener);
+        int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+        String node2NodeHostPort = "localhost:" + replica1Port;
+        String nodeName2 = TEST_NODE_NAME + "_1";
+        ReplicatedEnvironmentFacade secondNode = createReplica(nodeName2, node2NodeHostPort,
listener);
+
+        Set<String> permittedNodes = new HashSet<String>();
+        permittedNodes.add("localhost:" + TEST_NODE_PORT);
+        permittedNodes.add(nodeName2);
+        firstNode.setPermittedNodes(permittedNodes);
+
+        int counter = 0;
+        while(secondNode.getPermittedNodes().isEmpty() && counter < 100)
+        {
+            counter++;
+            Thread.sleep(50);
+        }
+        assertEquals("Permitted nodes are not set on a replica", permittedNodes, secondNode.getPermittedNodes());
+
+        int intruderPort = getNextAvailable(replica1Port+ 1);
+        createIntruder("intruder", "localhost:" + intruderPort);
+        assertTrue("Intruder node was not detected", intruderLatch.await(10, TimeUnit.SECONDS));
+    }
+
+    private void createIntruder(String nodeName, String node1NodeHostPort)
+    {
         File environmentPathFile = new File(_storePath, nodeName);
         environmentPathFile.mkdirs();
 
-        ReplicationConfig replicationConfig = new ReplicationConfig(TEST_GROUP_NAME, TEST_NODE_NAME
+ "_1", node1NodeHostPort);
+        ReplicationConfig replicationConfig = new ReplicationConfig(TEST_GROUP_NAME, nodeName,
node1NodeHostPort);
         replicationConfig.setHelperHosts(TEST_NODE_HOST_PORT);
 
         EnvironmentConfig envConfig = new EnvironmentConfig();
@@ -730,7 +773,6 @@ public class ReplicatedEnvironmentFacade
                 intruder.close();
             }
         }
-        assertTrue("Intruder node was not detected", intruderLatch.await(10, TimeUnit.SECONDS));
     }
 
     private ReplicatedEnvironmentFacade createMaster() throws Exception

Modified: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java?rev=1617322&r1=1617321&r2=1617322&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
(original)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeRestTest.java
Mon Aug 11 16:20:41 2014
@@ -302,6 +302,8 @@ public class BDBHAVirtualHostNodeRestTes
 
         int responseCode = getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName,
"PUT", nodeData);
         assertEquals("Unexpected response code for virtual host node " + nodeName + " creation
request", 201, responseCode);
+        String hostExpectedState = nodePort == helperPort ? State.ACTIVE.name(): State.UNAVAILABLE.name();
+        waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE,
hostExpectedState);
     }
 
     private Map<String, Object> createNodeAttributeMap(String nodeName, int nodePort,
int helperPort) throws Exception



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message