axis-java-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prad...@apache.org
Subject svn commit: r612147 [5/17] - in /webservices/axis2/branches/java/jaxws21: ./ modules/adb-codegen/ modules/adb-codegen/src/org/apache/axis2/schema/ modules/adb-codegen/src/org/apache/axis2/schema/template/ modules/adb-codegen/src/org/apache/axis2/schema...
Date Tue, 15 Jan 2008 16:22:27 GMT
Modified: webservices/axis2/branches/java/jaxws21/modules/addressing/src/org/apache/axis2/handlers/addressing/AddressingOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/addressing/src/org/apache/axis2/handlers/addressing/AddressingOutHandler.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/addressing/src/org/apache/axis2/handlers/addressing/AddressingOutHandler.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/addressing/src/org/apache/axis2/handlers/addressing/AddressingOutHandler.java Tue Jan 15 08:21:22 2008
@@ -38,6 +38,8 @@
 import org.apache.axis2.addressing.i18n.AddressingMessages;
 import org.apache.axis2.client.Options;
 import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisEndpoint;
+import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.Parameter;
 import org.apache.axis2.handlers.AbstractHandler;
 import org.apache.axis2.util.JavaUtils;
@@ -194,7 +196,7 @@
             String messageID = messageContextOptions.getMessageId();
             if (messageID != null && !isAddressingHeaderAlreadyAvailable(WSA_MESSAGE_ID, false))
             {//optional
-            	ArrayList attributes = (ArrayList)messageContext.getProperty(
+            	ArrayList attributes = (ArrayList)messageContext.getLocalProperty(
                         AddressingConstants.MESSAGEID_ATTRIBUTES);
                 createSOAPHeaderBlock(messageID, WSA_MESSAGE_ID, attributes);
             }
@@ -210,27 +212,31 @@
             if (action == null || action.length()==0) {
                 if (messageContext.getAxisOperation() != null) {
                     action = messageContext.getAxisOperation().getOutputAction();
+                    if(action!=null){
+                    	// Set this action back to obviate possible action mismatch problems
+                    	messageContext.setWSAAction(action);
+                    }
                     if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
                         log.trace(messageContext.getLogIDString() +
                                 " processWSAAction: action from AxisOperation: " + action);
                     }
                 }
-            }
-
-            // Use the correct fault action for the selected namespace
-            if(isFinalAddressingNamespace){
-            	if(Submission.WSA_FAULT_ACTION.equals(action)){
-            		action = Final.WSA_FAULT_ACTION;
-            		messageContextOptions.setAction(action);
-            	}
             }else{
-            	if(Final.WSA_FAULT_ACTION.equals(action)){
-            		action = Submission.WSA_FAULT_ACTION;
-            		messageContextOptions.setAction(action);
-            	}else if(Final.WSA_SOAP_FAULT_ACTION.equals(action)){
-                    action = Submission.WSA_FAULT_ACTION;
-                    messageContextOptions.setAction(action);
-            	}
+	            // Use the correct fault action for the selected namespace
+	            if(isFinalAddressingNamespace){
+	            	if(Submission.WSA_FAULT_ACTION.equals(action)){
+	            		action = Final.WSA_FAULT_ACTION;
+	            		messageContextOptions.setAction(action);
+	            	}
+	            }else{
+	            	if(Final.WSA_FAULT_ACTION.equals(action)){
+	            		action = Submission.WSA_FAULT_ACTION;
+	            		messageContextOptions.setAction(action);
+	            	}else if(Final.WSA_SOAP_FAULT_ACTION.equals(action)){
+	                    action = Submission.WSA_FAULT_ACTION;
+	                    messageContextOptions.setAction(action);
+	            	}
+	            }
             }
 
             // If we need to add a wsa:Action header
@@ -257,7 +263,7 @@
                                 " processWSAAction: Adding action to header: " + action);
                     }
                     // Otherwise just add the header
-                    ArrayList attributes = (ArrayList)messageContext.getProperty(
+                    ArrayList attributes = (ArrayList)messageContext.getLocalProperty(
                             AddressingConstants.ACTION_ATTRIBUTES);
                     createSOAPHeaderBlock(action, WSA_ACTION, attributes);
                 }
@@ -362,7 +368,7 @@
                     }
                     createSOAPHeaderBlock(address, WSA_TO, epr.getAddressAttributes());
                 }
-                processToEPRReferenceInformation(epr.getAllReferenceParameters(), header);
+                processToEPRReferenceInformation(epr.getAllReferenceParameters());
             }
         }
 
@@ -432,22 +438,46 @@
          * @param parent               is the element to which the referenceparameters should be
          *                             attached
          */
-        private void processToEPRReferenceInformation(Map referenceInformation, OMElement parent) {
-            if (referenceInformation != null && parent != null) {
+        private void processToEPRReferenceInformation(Map referenceInformation) {
+            if (referenceInformation != null) {
                 if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
                     log.trace("processToEPRReferenceInformation: " + referenceInformation);
                 }
                 Iterator iterator = referenceInformation.values().iterator();
                 while (iterator.hasNext()) {
                     OMElement omElement = (OMElement)iterator.next();
-                    OMElement newElement = ElementHelper.importOMElement(omElement, parent.getOMFactory());
+                    OMElement newElement = ElementHelper.importOMElement(omElement, header.getOMFactory());
                     if (isFinalAddressingNamespace) {
                         newElement.addAttribute(Final.WSA_IS_REFERENCE_PARAMETER_ATTRIBUTE,
                                                Final.WSA_TYPE_ATTRIBUTE_VALUE,
                                                addressingNamespaceObject);
                     }
-                    parent.addChild(newElement);
+                    header.addChild(newElement);
                 }
+            }
+            // Now add reference parameters we found in the WSDL (if any)
+            AxisService service = messageContext.getAxisService();
+            if(service != null){
+            	AxisEndpoint endpoint = service.getEndpoint(service.getEndpointName());
+            	if(endpoint != null){
+            		ArrayList referenceparameters = (ArrayList) endpoint.getParameterValue(REFERENCE_PARAMETER_PARAMETER);
+            		if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
+            			log.trace("processToEPRReferenceInformation: Reference Parameters from WSDL:" + referenceparameters);
+            		}
+            		if(referenceparameters!=null){
+            			Iterator iterator = referenceparameters.iterator();
+            			while (iterator.hasNext()) {
+            				OMElement omElement = (OMElement)iterator.next();
+            				OMElement newElement = ElementHelper.importOMElement(omElement, header.getOMFactory());
+            				if (isFinalAddressingNamespace) {
+            					newElement.addAttribute(Final.WSA_IS_REFERENCE_PARAMETER_ATTRIBUTE,
+            							Final.WSA_TYPE_ATTRIBUTE_VALUE,
+            							addressingNamespaceObject);
+            				}
+            				header.addChild(newElement);
+            			}
+            		}
+            	}
             }
         }
 

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/pom.xml
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/pom.xml?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/pom.xml (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/pom.xml Tue Jan 15 08:21:22 2008
@@ -63,5 +63,21 @@
                 </excludes>
             </resource>
         </resources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <inherited>true</inherited>
+                <configuration>
+                    <skip>false</skip>
+                    <excludes>
+                        <exclude>**/UpdateStateTest.java</exclude>
+                        <exclude>**/ConfigurationManagerTest.java</exclude>
+                    </excludes>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+        </plugins>
     </build>
 </project>

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java Tue Jan 15 08:21:22 2008
@@ -25,6 +25,7 @@
 import javax.activation.DataHandler;
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Random;
@@ -63,6 +64,9 @@
         } else {
             serviceArchive = new File(axis2Repo + File.separator + "services" +
                                       File.separator + serviceGroupName);
+        }
+        if(!serviceArchive.exists()){
+            throw new FileNotFoundException("File " + serviceArchive + " not found");
         }
         AxisServiceGroup asGroup =
                 DeploymentEngine.loadServiceGroup(serviceArchive, configCtx);

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java Tue Jan 15 08:21:22 2008
@@ -190,7 +190,9 @@
 
     public void setConfigurationContext(ConfigurationContext configurationContext) {
         this.configurationContext = configurationContext;
-        listener.setConfigurationContext(configurationContext);
+        if (listener != null) {
+            listener.setConfigurationContext(configurationContext);
+        }
     }
 
     public void addParameter(Parameter param) throws AxisFault {

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ClusteringContextListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ClusteringContextListener.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ClusteringContextListener.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ClusteringContextListener.java Tue Jan 15 08:21:22 2008
@@ -47,7 +47,7 @@
             try {
                 sender.sendToGroup(command);
             } catch (ClusteringFault e) {
-                log.error(e);
+                log.error("Cannot send context removed message to cluster", e);
             }
         }
     }

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Tue Jan 15 08:21:22 2008
@@ -19,13 +19,13 @@
 package org.apache.axis2.clustering.context;
 
 import org.apache.axiom.om.util.UUIDGenerator;
+import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
+import org.apache.axis2.clustering.context.commands.DeleteServiceGroupContextCommand;
 import org.apache.axis2.clustering.context.commands.UpdateConfigurationContextCommand;
 import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
 import org.apache.axis2.clustering.context.commands.UpdateServiceContextCommand;
 import org.apache.axis2.clustering.context.commands.UpdateServiceGroupContextCommand;
-import org.apache.axis2.clustering.context.commands.DeleteServiceGroupContextCommand;
-import org.apache.axis2.clustering.tribes.AckManager;
 import org.apache.axis2.context.AbstractContext;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.PropertyDifference;
@@ -42,15 +42,15 @@
 import java.util.Map;
 
 /**
- * 
+ *
  */
 public final class ContextClusteringCommandFactory {
 
     private static final Log log = LogFactory.getLog(ContextClusteringCommandFactory.class);
 
     public static ContextClusteringCommandCollection
-            getCommandCollection(AbstractContext[] contexts,
-                                 Map excludedReplicationPatterns) {
+    getCommandCollection(AbstractContext[] contexts,
+                         Map excludedReplicationPatterns) {
 
         ArrayList commands = new ArrayList(contexts.length);
         ContextClusteringCommandCollection collection =
@@ -63,14 +63,12 @@
                 commands.add(cmd);
             }
         }
-        collection.setUniqueId(UUIDGenerator.getUUID());
-        AckManager.addInitialAcknowledgement(collection);
         return collection;
     }
 
     /**
-     * @param context
-     * @param excludedPropertyPatterns
+     * @param context                  The context
+     * @param excludedPropertyPatterns The property patterns to be excluded
      * @param includeAllProperties     True - Include all properties,
      *                                 False - Include only property differences
      * @return ContextClusteringCommand
@@ -79,6 +77,43 @@
                                                             Map excludedPropertyPatterns,
                                                             boolean includeAllProperties) {
 
+        UpdateContextCommand cmd = toUpdateContextCommand(context);
+        if (cmd != null) {
+            fillProperties(cmd,
+                           context,
+                           excludedPropertyPatterns,
+                           includeAllProperties);
+            if (cmd.isPropertiesEmpty()) {
+                cmd = null;
+            }
+        }
+
+        synchronized (context) {
+            context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
+        }
+        return cmd;
+    }
+
+
+    public static ContextClusteringCommand getUpdateCommand(AbstractContext context,
+                                                            String[] propertyNames)
+            throws ClusteringFault {
+
+        UpdateContextCommand cmd = toUpdateContextCommand(context);
+        if (cmd != null) {
+            fillProperties(cmd, context, propertyNames);
+            if (cmd.isPropertiesEmpty()) {
+                cmd = null;
+            } 
+        }
+
+        synchronized (context) {
+            context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
+        }
+        return cmd;
+    }
+
+    private static UpdateContextCommand toUpdateContextCommand(AbstractContext context) {
         UpdateContextCommand cmd = null;
         if (context instanceof ConfigurationContext) {
             cmd = new UpdateConfigurationContextCommand();
@@ -98,29 +133,13 @@
             updateServiceCmd.setServiceGroupContextId(serviceCtx.getServiceGroupContext().getId());
             updateServiceCmd.setServiceName(serviceCtx.getAxisService().getName());
         }
-        if (cmd != null) {
-            cmd.setUniqueId(UUIDGenerator.getUUID());
-            fillProperties(cmd,
-                           context,
-                           excludedPropertyPatterns,
-                           includeAllProperties);
-            if (cmd.isPropertiesEmpty()) {
-                cmd = null;
-            } else {
-                AckManager.addInitialAcknowledgement(cmd);
-            }
-        }
-
-        synchronized (context) {
-            context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
-        }
         return cmd;
     }
 
     /**
-     * @param updateCmd
-     * @param context
-     * @param excludedPropertyPatterns
+     * @param updateCmd                The command
+     * @param context                  The context
+     * @param excludedPropertyPatterns The property patterns to be excluded from replication
      * @param includeAllProperties     True - Include all properties,
      *                                 False - Include only property differences
      */
@@ -133,18 +152,18 @@
                 Map diffs = context.getPropertyDifferences();
                 for (Iterator iter = diffs.keySet().iterator(); iter.hasNext();) {
                     String key = (String) iter.next();
-                    Object prop = context.getPropertyNonReplicable(key);
+                    PropertyDifference diff = (PropertyDifference) diffs.get(key);
+                    Object value = diff.getValue();
 
-                    // First check whether it is serializable
-                    if (prop instanceof Serializable) {
+                    if (value instanceof Serializable) {
 
                         // Next check whether it matches an excluded pattern
                         if (!isExcluded(key,
                                         context.getClass().getName(),
                                         excludedPropertyPatterns)) {
-                            log.debug("sending property =" + key + "-" + prop);
-                            PropertyDifference diff = (PropertyDifference) diffs.get(key);
-                            diff.setValue(prop);
+                            if (log.isDebugEnabled()) {
+                                log.debug("sending property =" + key + "-" + value);
+                            }
                             updateCmd.addProperty(diff);
                         }
                     }
@@ -154,14 +173,15 @@
             synchronized (context) {
                 for (Iterator iter = context.getPropertyNames(); iter.hasNext();) {
                     String key = (String) iter.next();
-                    Object prop = context.getPropertyNonReplicable(key);
-                    if (prop instanceof Serializable) { // First check whether it is serializable
+                    Object value = context.getPropertyNonReplicable(key);
+                    if (value instanceof Serializable) { 
 
                         // Next check whether it matches an excluded pattern
-                        if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns))
-                        {
-                            log.debug("sending property =" + key + "-" + prop);
-                            PropertyDifference diff = new PropertyDifference(key, prop, false);
+                        if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("sending property =" + key + "-" + value);
+                            }
+                            PropertyDifference diff = new PropertyDifference(key, value, false);
                             updateCmd.addProperty(diff);
                         }
                     }
@@ -170,24 +190,53 @@
         }
     }
 
+    private static void fillProperties(UpdateContextCommand updateCmd,
+                                       AbstractContext context,
+                                       String[] propertyNames) throws ClusteringFault {
+        synchronized (context) {
+            Map diffs = context.getPropertyDifferences();
+            for (int i = 0; i < propertyNames.length; i++) {
+                String key = propertyNames[i];
+                Object prop = context.getPropertyNonReplicable(key);
+
+                // First check whether it is serializable
+                if (prop instanceof Serializable) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("sending property =" + key + "-" + prop);
+                    }
+                    PropertyDifference diff = (PropertyDifference) diffs.get(key);
+                    diff.setValue(prop);
+                    updateCmd.addProperty(diff);
+                } else {
+                    String msg =
+                            "Trying to replicate non-serializable property " + key +
+                            " in context " + context;
+                    throw new ClusteringFault(msg);
+                }
+            }
+        }
+    }
+
     private static boolean isExcluded(String propertyName,
                                       String ctxClassName,
                                       Map excludedPropertyPatterns) {
 
-        // First check in the default excludes
-        List defaultExcludes =
+        // Check in the excludes list specific to the context
+        List specificExcludes =
+                (List) excludedPropertyPatterns.get(ctxClassName);
+        boolean isExcluded = false;
+        if (specificExcludes != null) {
+            isExcluded = isExcluded(specificExcludes, propertyName);
+        }
+        if (!isExcluded) {
+            // check in the default excludes
+            List defaultExcludes =
                 (List) excludedPropertyPatterns.get(DeploymentConstants.TAG_DEFAULTS);
-        if (defaultExcludes == null) {
-            return false;
-        }
-        if (isExcluded(defaultExcludes, propertyName)) {
-            return true;
-        } else {
-            // If not, check in the excludes list specific to the context
-            List specificExcludes =
-                    (List) excludedPropertyPatterns.get(ctxClassName);
-            return isExcluded(specificExcludes, propertyName);
+            if (defaultExcludes != null) {
+                isExcluded = isExcluded(defaultExcludes, propertyName);
+            }
         }
+        return isExcluded;
     }
 
     private static boolean isExcluded(List list, String propertyName) {
@@ -214,11 +263,10 @@
         if (abstractContext instanceof ServiceGroupContext) {
             ServiceGroupContext sgCtx = (ServiceGroupContext) abstractContext;
             DeleteServiceGroupContextCommand cmd = new DeleteServiceGroupContextCommand();
-            cmd.setUniqueId(UUIDGenerator.getUUID());
             cmd.setServiceGroupContextId(sgCtx.getId());
-            
+
             return cmd;
-        } 
+        }
         return null;
     }
 }

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java Tue Jan 15 08:21:22 2008
@@ -22,9 +22,7 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
-import org.apache.axis2.clustering.tribes.AckManager;
 import org.apache.axis2.clustering.tribes.ChannelSender;
 import org.apache.axis2.context.AbstractContext;
 import org.apache.axis2.context.ConfigurationContext;
@@ -32,16 +30,20 @@
 import org.apache.axis2.context.ServiceGroupContext;
 import org.apache.axis2.description.Parameter;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 public class DefaultContextManager implements ContextManager {
 
     private ConfigurationContext configContext;
+    private ContextManagerListener listener;
 
     private Map parameters = new HashMap();
 
     private ChannelSender sender;
-    private ContextReplicationProcessor processor = new ContextReplicationProcessor();
 
     private Map excludedReplicationPatterns = new HashMap();
 
@@ -53,30 +55,35 @@
     public DefaultContextManager() {
     }
 
-    public String updateContext(AbstractContext context) throws ClusteringFault {
+    public void updateContext(AbstractContext context) throws ClusteringFault {
         ContextClusteringCommand cmd =
                 ContextClusteringCommandFactory.getUpdateCommand(context,
                                                                  excludedReplicationPatterns,
                                                                  false);
         if (cmd != null) {
-            processor.process(cmd);
-            return cmd.getUniqueId();
+            sender.sendToGroup(cmd);
         }
-        return null;
     }
 
-    public String updateContexts(AbstractContext[] contexts) throws ClusteringFault {
+    public void updateContext(AbstractContext context,
+                                String[] propertyNames) throws ClusteringFault {
+        ContextClusteringCommand cmd =
+                ContextClusteringCommandFactory.getUpdateCommand(context, propertyNames);
+        if (cmd != null) {
+            sender.sendToGroup(cmd);
+        }
+    }
+
+    public void updateContexts(AbstractContext[] contexts) throws ClusteringFault {
         ContextClusteringCommandCollection cmd =
                 ContextClusteringCommandFactory.getCommandCollection(contexts,
                                                                      excludedReplicationPatterns);
-        processor.process(cmd);
-        return cmd.getUniqueId();
+        sender.sendToGroup(cmd);
     }
 
-    public String removeContext(AbstractContext context) throws ClusteringFault {
+    public void removeContext(AbstractContext context) throws ClusteringFault {
         ContextClusteringCommand cmd = ContextClusteringCommandFactory.getRemoveCommand(context);
-        processor.process(cmd);
-        return cmd.getUniqueId();
+        sender.sendToGroup(cmd);
     }
 
     public boolean isContextClusterable(AbstractContext context) {
@@ -85,22 +92,18 @@
                (context instanceof ServiceGroupContext);
     }
 
-    public boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault {
-        return AckManager.isMessageAcknowledged(messageUniqueId, sender);
-    }
-
-    public void process(ContextClusteringCommand command) throws ClusteringFault {
-        command.execute(configContext);
-    }
-
     public void setContextManagerListener(ContextManagerListener listener) {
+        this.listener = listener;
         if (configContext != null) {
-            listener.setConfigurationContext(configContext);
+            this.listener.setConfigurationContext(configContext);
         }
     }
 
     public void setConfigurationContext(ConfigurationContext configurationContext) {
         this.configContext = configurationContext;
+        if (listener != null) {
+            listener.setConfigurationContext(configContext);
+        }
     }
 
     public void setReplicationExcludePatterns(String contextType, List patterns) {
@@ -140,43 +143,4 @@
         throw new UnsupportedOperationException();
     }
     // ---------------------------------------------------------------------------------------------
-
-    private class ContextReplicationProcessor {
-        public void process(final ContextClusteringCommand cmd) throws ClusteringFault {
-
-            // If the sender is NULL, it means the TribesClusterManager is still being initialized
-            // So we need to busy wait.
-            if (sender == null) {
-                Thread processorThread = new Thread("ProcessorThread") {
-                    public void run() {
-                        do {
-                            try {
-                                Thread.sleep(300);
-                            } catch (InterruptedException e) {
-                                e.printStackTrace();
-                            }
-                        } while (sender == null);
-                        try {
-                            long tts = sender.sendToGroup(cmd);
-                            configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
-                                                                   new Long(tts));
-                        } catch (ClusteringFault clusteringFault) {
-                            AckManager.removeMessage(cmd.getUniqueId());
-                            throw new RuntimeException(clusteringFault);
-                        }
-                    }
-                };
-                processorThread.start();
-            } else {
-                try {
-                    long tts = sender.sendToGroup(cmd);
-                    configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
-                                                           new Long(tts));
-                } catch (ClusteringFault clusteringFault) {
-                    AckManager.removeMessage(cmd.getUniqueId());
-                    throw clusteringFault;
-                }
-            }
-        }
-    }
 }

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManagerListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManagerListener.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManagerListener.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManagerListener.java Tue Jan 15 08:21:22 2008
@@ -27,12 +27,9 @@
 public class DefaultContextManagerListener implements ContextManagerListener {
 
     private ConfigurationContext configurationContext;
-    private static final Log log = LogFactory.getLog(DefaultContextManagerListener.class);
 
     public void contextUpdated(ContextClusteringCommand message) throws ClusteringFault {
-        log.debug("Enter: DefaultContextManagerListener::contextRemoved");
         message.execute(configurationContext);
-        log.debug("Exit: DefaultContextManagerListener::contextRemoved");
     }
 
     public void setConfigurationContext(ConfigurationContext configurationContext) {

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/PropertyUpdater.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/PropertyUpdater.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/PropertyUpdater.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/PropertyUpdater.java Tue Jan 15 08:21:22 2008
@@ -36,7 +36,9 @@
     private Map properties;
 
     public void updateProperties(AbstractContext abstractContext) {
-        log.debug("Updating props in " + abstractContext);
+        if (log.isDebugEnabled()) {
+            log.debug("Updating props in " + abstractContext);
+        }
         if (abstractContext != null) {
             for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
                 String key = (String) iter.next();
@@ -46,8 +48,10 @@
                     abstractContext.removePropertyNonReplicable(key);
                 } else {  // it is updated/added
                     abstractContext.setNonReplicableProperty(key, propDiff.getValue());
-                    log.debug("Added prop=" + key + ", value=" + propDiff.getValue() +
-                              " to context " + abstractContext);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Added prop=" + key + ", value=" + propDiff.getValue() +
+                                  " to context " + abstractContext);
+                    }
                 }
             }
         }

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java Tue Jan 15 08:21:22 2008
@@ -45,6 +45,6 @@
     }
 
     public String toString() {
-        return "ContextClusteringCommandCollection(" + uniqueId + ")";
+        return "ContextClusteringCommandCollection";
     }
 }

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java Tue Jan 15 08:21:22 2008
@@ -31,6 +31,6 @@
     }
 
     public String toString() {
-        return "UpdateConfigurationContextCommand(" + uniqueId + ")";
+        return "UpdateConfigurationContextCommand";
     }
 }

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java Tue Jan 15 08:21:22 2008
@@ -42,7 +42,7 @@
     public void addProperty(PropertyDifference diff) {
         if (propertyUpdater.getProperties() == null) {
             propertyUpdater.setProperties(new HashMap());
-        }
+        }                                        
         propertyUpdater.addContextProperty(diff);
     }
 }

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java Tue Jan 15 08:21:22 2008
@@ -53,7 +53,9 @@
     }
 
     public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
-        log.debug("Updating service context properties...");
+        if (log.isDebugEnabled()) {
+            log.debug("Updating service context properties...");
+        }
         ServiceGroupContext sgCtx =
                 configurationContext.getServiceGroupContext(serviceGroupContextId);
         if (sgCtx != null) {
@@ -100,6 +102,6 @@
     }
 
     public String toString() {
-        return "UpdateServiceContextCommand(" + uniqueId + ")";
+        return "UpdateServiceContextCommand";
     }
 }

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java Tue Jan 15 08:21:22 2008
@@ -66,11 +66,13 @@
             sgCtx.setId(serviceGroupContextId);
             configContext.addServiceGroupContextIntoSoapSessionTable(sgCtx);  // TODO: Check this
         }
-        log.debug("###### Gonna update SG prop in " + serviceGroupContextId + "===" + sgCtx);
+        if (log.isDebugEnabled()) {
+            log.debug("Gonna update SG prop in " + serviceGroupContextId + "===" + sgCtx);
+        }
         propertyUpdater.updateProperties(sgCtx);
     }
 
     public String toString() {
-        return "UpdateServiceGroupContextCommand(" + uniqueId + ")";
+        return "UpdateServiceGroupContextCommand";
     }
 }

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationCommand.java Tue Jan 15 08:21:22 2008
@@ -17,6 +17,8 @@
 
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.AxisServiceGroup;
 import org.apache.axis2.engine.AxisConfiguration;
 
@@ -37,7 +39,20 @@
         AxisConfiguration axisConfig = configCtx.getAxisConfiguration();
         for (Iterator iter = axisConfig.getServiceGroups(); iter.hasNext();) {
             AxisServiceGroup serviceGroup = (AxisServiceGroup) iter.next();
-            serviceGroupNames.add(serviceGroup.getServiceGroupName());
+            boolean excludeSG = false;
+            for (Iterator serviceIter = serviceGroup.getServices(); serviceIter.hasNext();) {
+                AxisService service = (AxisService) serviceIter.next();
+                if (service.getParameter(AxisModule.MODULE_SERVICE) != null ||
+                    service.isClientSide()) { // No need to send services deployed through modules or client side services
+                    excludeSG = true;
+                    break;
+                }
+            }
+
+            //TODO: Exclude all services loaded from modules. How to handle data services etc.?
+            if (!excludeSG) {
+                serviceGroupNames.add(serviceGroup.getServiceGroupName());
+            }
         }
         this.serviceGroupNames =
                 (String[]) serviceGroupNames.toArray(new String[serviceGroupNames.size()]);

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java Tue Jan 15 08:21:22 2008
@@ -24,6 +24,8 @@
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.AxisModule;
 import org.apache.axis2.AxisFault;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.io.FileNotFoundException;
 import java.util.Iterator;
@@ -33,6 +35,8 @@
  */
 public class GetConfigurationResponseCommand extends ControlCommand {
 
+    private static final Log log = LogFactory.getLog(GetConfigurationResponseCommand.class);
+
     private String[] serviceGroups;
 
     public void execute(ConfigurationContext configContext) throws ClusteringFault {
@@ -40,7 +44,10 @@
 
         // Run this code only if this node is not already initialized
         if (configContext.
-                getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+                getPropertyNonReplicable(ClusteringConstants.RECD_CONFIG_INIT_MSG) == null) {
+            log.info("Received configuration initialization message");
+            configContext.
+                setNonReplicableProperty(ClusteringConstants.RECD_CONFIG_INIT_MSG, "true");
             if (serviceGroups != null) {
 
                 // Load all the service groups that are sent by the neighbour
@@ -59,6 +66,8 @@
                     }
                 }
 
+                //TODO: We support only AAR files for now
+
                 // Unload all service groups which were not sent by the neighbour,
                 // but have been currently loaded
                 for (Iterator iter = axisConfig.getServiceGroups(); iter.hasNext();) {
@@ -79,9 +88,9 @@
                              serviceIter.hasNext();) {
                             AxisService service = (AxisService) serviceIter.next();
                             if (service.isClientSide() ||
-                                service.getParameter(AxisModule.MODULE_SERVICE) != null) {
+                                service.getParameter(AxisModule.MODULE_SERVICE) != null) { // Do not unload service groups containing client side services or ones deployed from within modules
                                 mustUnloadServiceGroup = false;
-                                break; // Do not unload service groups containing client side services
+                                break;
                             }
                         }
                         if (mustUnloadServiceGroup) {

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java Tue Jan 15 08:21:22 2008
@@ -22,24 +22,30 @@
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.context.ContextClusteringCommand;
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * 
  */
 public class GetStateResponseCommand extends ControlCommand {
 
-    private ContextClusteringCommand[] commands;
+    private static final Log log = LogFactory.getLog(GetStateResponseCommand.class);
 
-    public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+    private ContextClusteringCommand[] commands;
 
+    public void execute(ConfigurationContext configContext) throws ClusteringFault {
+        log.info("Received state initialization message");
+        
         // Run this code only if this node is not already initialized
-        if (configurationContext.
-                getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
-            configurationContext.
-                    setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
+        if (configContext.
+                getPropertyNonReplicable(ClusteringConstants.RECD_STATE_INIT_MSG) == null) {
+            configContext.
+                setNonReplicableProperty(ClusteringConstants.RECD_STATE_INIT_MSG, "true");
+//            log.info("Received state initialization message");
             if (commands != null) {
                 for (int i = 0; i < commands.length; i++) {
-                    commands[i].execute(configurationContext);
+                    commands[i].execute(configContext);
                 }
             }
         }

Copied: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (from r611653, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?p2=webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java&r1=611653&r2=612147&rev=612147&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Tue Jan 15 08:21:22 2008
@@ -15,45 +15,21 @@
  */
 package org.apache.axis2.clustering.tribes;
 
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelInterceptor;
 import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.RemoteProcessException;
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ByteMessage;
-import org.apache.catalina.tribes.util.UUIDGenerator;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.membership.Membership;
-import org.apache.catalina.tribes.membership.MemberImpl;
 import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.axis2.engine.AxisConfiguration;
-import org.apache.axis2.description.AxisServiceGroup;
-import org.apache.axis2.description.AxisModule;
 
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TimerTask;
-import java.util.Timer;
 import java.util.Iterator;
 import java.util.List;
-import java.util.ArrayList;
-import java.net.Socket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.ConnectException;
-import java.io.Serializable;
+import java.util.Map;
 
 /**
  * Message intereceptor for handling at-most-once message processing semantics
  */
-public class AtMostOnceInterceptor extends ChannelInterceptorBase {
+public class AtMostOnceInterceptor extends ChannelInterceptorBase {          
 
     private static Log log = LogFactory.getLog(AtMostOnceInterceptor.class);
     private static final Map receivedMessages = new HashMap();
@@ -61,39 +37,61 @@
     /**
      * The time a message lives in the receivedMessages Map
      */
-    private static final int TIMEOUT = 60 * 1000;
+    private static final int TIMEOUT = 5 * 60 * 1000;
 
     public AtMostOnceInterceptor() {
+        Thread cleanupThread = new Thread(new MessageCleanupTask());
+        cleanupThread.setPriority(Thread.MIN_PRIORITY);
+        cleanupThread.start();
+    }
 
-        TimerTask cleanupTask = new TimerTask() {
-            public void run() {
-                List toBeRemoved = new ArrayList();
-                for (Iterator iterator = receivedMessages.keySet().iterator();
-                     iterator.hasNext();) {
-                    ChannelMessage msg = (ChannelMessage) iterator.next();
-                    long arrivalTime = ((Long) receivedMessages.get(msg)).longValue();
-                    if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) {
-                        toBeRemoved.add(msg);
-                    }
+    public void messageReceived(ChannelMessage msg) {
+        synchronized (receivedMessages) {
+            if (receivedMessages.get(msg) == null) {  // If it is a new message, keep track of it
+                receivedMessages.put(msg, new Long(System.currentTimeMillis()));
+                super.messageReceived(msg);
+            } else {  // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
+                log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress()));
+            }
+        }
+    }
+
+    private class MessageCleanupTask implements Runnable {
+
+        public void run() {
+            while (true) {
+                try {
+                    Thread.sleep(TIMEOUT);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
                 }
-                for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
-                    ChannelMessage msg = (ChannelMessage) iterator.next();
-                    receivedMessages.remove(msg);
-                    if (log.isDebugEnabled()) {
-                        log.debug("Cleaned up message ");
+                try {
+                    List toBeRemoved = new ArrayList();
+                    Thread.yield();
+                    synchronized (receivedMessages) {
+                        for (Iterator iterator = receivedMessages.keySet().iterator();
+                             iterator.hasNext();) {
+                            ChannelMessage msg = (ChannelMessage) iterator.next();
+                            long arrivalTime = ((Long) receivedMessages.get(msg)).longValue();
+                            if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) {
+                                toBeRemoved.add(msg);
+                                if(toBeRemoved.size() > 10000){ // Do not allow this thread to run for too long
+                                    break;
+                                }
+                            }
+                        }
+                        for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
+                            ChannelMessage msg = (ChannelMessage) iterator.next();
+                            receivedMessages.remove(msg);
+                            if (log.isDebugEnabled()) {
+                                log.debug("Cleaned up message ");
+                            }
+                        }
                     }
+                } catch (Exception e) {
+                    log.error("Exception occurred while trying to cleanup messages", e);
                 }
             }
-        };
-        new Timer().scheduleAtFixedRate(cleanupTask, TIMEOUT, TIMEOUT);
-    }
-
-    public void messageReceived(ChannelMessage msg) {
-        super.messageReceived(msg);
-        if (receivedMessages.get(msg) == null) {  // If it is a new message, keep track of it
-            receivedMessages.put(msg, new Long(System.currentTimeMillis()));
-        } else {  // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
-            log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress()));
         }
     }
 }

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Tue Jan 15 08:21:22 2008
@@ -25,9 +25,6 @@
 import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
 import org.apache.axis2.clustering.context.ContextClusteringCommand;
 import org.apache.axis2.clustering.context.DefaultContextManager;
-import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
-import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
-import org.apache.axis2.clustering.control.AckCommand;
 import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
 import org.apache.axis2.clustering.control.GetStateResponseCommand;
@@ -37,6 +34,8 @@
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.catalina.tribes.ByteMessage;
 import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.RpcMessage;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,20 +50,17 @@
 
     private DefaultContextManager contextManager;
     private DefaultConfigurationManager configurationManager;
-    private TribesControlCommandProcessor controlCommandProcessor;
-    private ChannelSender channelSender;
+    private ControlCommandProcessor controlCommandProcessor;
 
     private ConfigurationContext configurationContext;
 
     public ChannelListener(ConfigurationContext configurationContext,
                            DefaultConfigurationManager configurationManager,
                            DefaultContextManager contextManager,
-                           TribesControlCommandProcessor controlCommandProcessor,
-                           ChannelSender sender) {
+                           ControlCommandProcessor controlCommandProcessor) {
         this.configurationManager = configurationManager;
         this.contextManager = contextManager;
         this.controlCommandProcessor = controlCommandProcessor;
-        this.channelSender = sender;
         this.configurationContext = configurationContext;
     }
 
@@ -80,10 +76,21 @@
         this.configurationContext = configurationContext;
     }
 
+    /**
+     * Invoked by the channel to determine if the listener will process this message or not.
+     * @param msg Serializable
+     * @param sender Member
+     * @return boolean
+     */
     public boolean accept(Serializable msg, Member sender) {
-        return true;
+        return !(msg instanceof RpcMessage);  // RpcMessages  will not be handled by this listener
     }
 
+    /**
+     * Receive a message from the channel
+     * @param msg Serializable
+     * @param sender - the source of the message
+     */
     public void messageReceived(Serializable msg, Member sender) {
         try {
             AxisConfiguration configuration = configurationContext.getAxisConfiguration();
@@ -98,58 +105,45 @@
                 AxisModule module = (AxisModule) iter.next();
                 classLoaders.add(module.getModuleClassLoader());
             }
-
-
             byte[] message = ((ByteMessage) msg).getMessage();
             msg = XByteBuffer.deserialize(message,
                                           0,
                                           message.length,
-                                          (ClassLoader[])classLoaders.toArray(new ClassLoader[classLoaders.size()])); 
+                                          (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
         } catch (Exception e) {
-            log.error(e);
+            String errMsg = "Cannot deserialize received message";
+            log.error(errMsg, e);
+            throw new RemoteProcessException(errMsg, e);
         }
 
         // If the system has not still been intialized, reject all incoming messages, except the
         // GetStateResponseCommand message
         if (configurationContext.
-                getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
-            && !(msg instanceof GetStateResponseCommand) &&
-            !(msg instanceof GetConfigurationResponseCommand)) {
-
-            log.warn("Received message before cluster initialization has been completed");
+                getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+            log.warn("Received message " + msg +
+                     " before cluster initialization has been completed from " +
+                     TribesUtil.getHost(sender));
             return;
         }
-        log.debug("Received message " + msg + " from " + TribesUtil.getHost(sender));
+        if (log.isDebugEnabled()) {
+            log.debug("Received message " + msg + " from " + TribesUtil.getHost(sender));
+        }
+
         try {
             processMessage(msg, sender);
         } catch (Exception e) {
-            log.error(e);
+            String errMsg = "Cannot process received message";
+            log.error(errMsg, e);
+            throw new RemoteProcessException(errMsg, e);
         }
     }
 
     private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
-        //TODO: Reject duplicates that can be received due to retransmissions
-        //TODO: ACK implosion?
-
         if (msg instanceof ContextClusteringCommand && contextManager != null) {
             ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
-            contextManager.process(ctxCmd);
-
-            // Sending ACKs for ContextClusteringCommandCollection or
-            // UpdateContextCommand is sufficient
-            if (msg instanceof ContextClusteringCommandCollection ||
-                msg instanceof UpdateContextCommand) {
-                AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
-
-                // Send the ACK
-                this.channelSender.sendToMember(ackCmd, sender);
-            }
-        } else if (msg instanceof ConfigurationClusteringCommand &&
-                   configurationManager != null) {
+            ctxCmd.execute(configurationContext);
+        } else if (msg instanceof ConfigurationClusteringCommand && configurationManager != null) {
             configurationManager.process((ConfigurationClusteringCommand) msg);
-        } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
-            controlCommandProcessor.process((ControlCommand) msg,
-                                            sender);
-        }
+        } 
     }
 }

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Tue Jan 15 08:21:22 2008
@@ -24,6 +24,7 @@
 import org.apache.axis2.clustering.MessageSender;
 import org.apache.catalina.tribes.ByteMessage;
 import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.Member;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,39 +38,58 @@
 
     private Log log = LogFactory.getLog(ChannelSender.class);
     private Channel channel;
+    private boolean synchronizeAllMembers;
+    private MembershipManager membershipManager;
 
-    public long sendToGroup(ClusteringCommand msg) throws ClusteringFault {
+    public ChannelSender(Channel channel,
+                         MembershipManager membershipManager,
+                         boolean synchronizeAllMembers) {
+        this.channel = channel;
+        this.membershipManager = membershipManager;
+        this.synchronizeAllMembers = synchronizeAllMembers;
+    }
+
+    public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
         if (channel == null) {
-            return 0;
+            return;
         }
-        long timeToSend = 0;
+        Member[] members = membershipManager.getMembers();
 
         // Keep retrying, since at the point of trying to send the msg, a member may leave the group
         // causing a view change. All nodes in a view should get the msg
-        //TODO: Sometimes Tribes incorrectly detects that a member has left a group
-        while (true) {
-            if (channel.getMembers().length > 0) {
-                try {
-                    long start = System.currentTimeMillis();
-                    channel.send(channel.getMembers(), toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
-                    timeToSend = System.currentTimeMillis() - start;
+        if (members.length > 0) {
+            try {
+                if (synchronizeAllMembers) {
+                    channel.send(members, toByteMessage(msg),
+                                 Channel.SEND_OPTIONS_USE_ACK |
+                                 Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+                                 TribesClusterManager.MSG_ORDER_OPTION);
+                } else {
+                    channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_ASYNCHRONOUS);
+                }
+                if (log.isDebugEnabled()) {
                     log.debug("Sent " + msg + " to group");
-                    break;
-                } catch (NotSerializableException e) {
-                    String message = "Could not send command message " + msg +
-                                     " to group since it is not serializable.";
-                    log.error(message, e);
-                    throw new ClusteringFault(message, e);
-                } catch (Exception e) {
-                    String message = "Error sending command message : " + msg +
-                                     ". Reason " + e.getMessage();
-                    log.warn(message, e);
                 }
-            } else {
-                break;
+            } catch (NotSerializableException e) {
+                String message = "Could not send command message " + msg +
+                                 " to group since it is not serializable.";
+                log.error(message, e);
+                throw new ClusteringFault(message, e);
+            } catch (ChannelException e) {
+                log.error("Could not send message to some members", e);
+                ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+                for (int i = 0; i < faultyMembers.length; i++) {
+                    ChannelException.FaultyMember faultyMember = faultyMembers[i];
+                    Member member = faultyMember.getMember();
+                    log.error("Member " + TribesUtil.getHost(member) + " is faulty",
+                              faultyMember.getCause());
+                }
+            } catch (Exception e) {
+                String message = "Error sending command message : " + msg +
+                                 ". Reason " + e.getMessage();
+                log.warn(message, e);
             }
         }
-        return timeToSend;
     }
 
     private ByteMessage toByteMessage(ClusteringCommand msg) throws IOException {
@@ -89,39 +109,37 @@
             channel.send(new Member[]{channel.getLocalMember(true)},
                          toByteMessage(msg),
                          Channel.SEND_OPTIONS_USE_ACK);
-            log.debug("Sent " + msg + " to self");
+            if (log.isDebugEnabled()) {
+                log.debug("Sent " + msg + " to self");
+            }
         } catch (Exception e) {
             throw new ClusteringFault(e);
         }
     }
 
-    public long sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
-        long timeToSend = 0;
+    public void sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
         try {
             if (member.isReady()) {
-                long start = System.currentTimeMillis();
-                channel.send(new Member[]{member}, toByteMessage(cmd), Channel.SEND_OPTIONS_USE_ACK);
-                timeToSend = System.currentTimeMillis() - start;
-                log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
+                channel.send(new Member[]{member}, toByteMessage(cmd),
+                             Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+                if (log.isDebugEnabled()) {
+                    log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
+                }
             }
         } catch (NotSerializableException e) {
-            String message = "Could not send command message to " + TribesUtil.getHost(member)  +
+            String message = "Could not send command message to " + TribesUtil.getHost(member) +
                              " since it is not serializable.";
             log.error(message, e);
             throw new ClusteringFault(message, e);
+        } catch (ChannelException e) {
+            log.error("Could not send message to " + TribesUtil.getHost(member));
+            ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+            log.error("Member " + TribesUtil.getHost(member) + " is faulty",
+                      faultyMembers[0].getCause());
         } catch (Exception e) {
             String message = "Could not send message to " + TribesUtil.getHost(member) +
                              ". Reason " + e.getMessage();
             log.warn(message, e);
         }
-        return timeToSend;
-    }
-
-    public Channel getChannel() {
-        return channel;
-    }
-
-    public void setChannel(Channel channel) {
-        this.channel = channel;
     }
 }

Copied: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (from r610664, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?p2=webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java&r1=610664&r2=612147&rev=612147&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Tue Jan 15 08:21:22 2008
@@ -25,21 +25,21 @@
  * Responsible for managing the membership
  */
 public class MembershipManager {
-    private static final List members = new ArrayList();
+    private final List members = new ArrayList();
 
-    public synchronized static void memberAdded(Member member) {
+    public synchronized void memberAdded(Member member) {
         members.add(member);
     }
 
-    public synchronized static void memberDisappeared(Member member) {
+    public synchronized void memberDisappeared(Member member) {
         members.remove(member);
     }
 
-    public synchronized static Member[] getMembers() {
+    public synchronized Member[] getMembers() {
         return (Member[]) members.toArray(new Member[members.size()]);
     }
 
-    public synchronized static Member getLongestLivingMember() {
+    public synchronized Member getLongestLivingMember() {
         Member longestLivingMember = null;
         if (members.size() > 0) {
             Member member0 = (Member) members.get(0);
@@ -56,8 +56,8 @@
         return longestLivingMember;
     }
 
-    public synchronized static Member getRandomMember() {
-        if(members.size() == 0){
+    public synchronized Member getRandomMember() {
+        if (members.size() == 0) {
             return null;
         }
         int memberIndex = new Random().nextInt(members.size());

Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Tue Jan 15 08:21:22 2008
@@ -22,7 +22,6 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.clustering.ClusterManager;
-import org.apache.axis2.clustering.ClusteringCommand;
 import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.RequestBlockingHandler;
@@ -31,6 +30,7 @@
 import org.apache.axis2.clustering.context.ClusteringContextListener;
 import org.apache.axis2.clustering.context.ContextManager;
 import org.apache.axis2.clustering.context.DefaultContextManager;
+import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.control.GetConfigurationCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
 import org.apache.axis2.context.ConfigurationContext;
@@ -45,8 +45,14 @@
 import org.apache.catalina.tribes.ManagedChannel;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcChannel;
 import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
 import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+import org.apache.catalina.tribes.transport.MultiPointSender;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -54,9 +60,9 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Random;
 
 public class TribesClusterManager implements ClusterManager {
+    public static final int MSG_ORDER_OPTION = 512;
     private static final Log log = LogFactory.getLog(TribesClusterManager.class);
 
     private DefaultConfigurationManager configurationManager;
@@ -64,13 +70,16 @@
 
     private HashMap parameters;
     private ManagedChannel channel;
+    private RpcChannel rpcChannel;
     private ConfigurationContext configurationContext;
-    private TribesControlCommandProcessor controlCmdProcessor;
+    private ControlCommandProcessor controlCmdProcessor;
     private ChannelListener channelListener;
+    private ChannelSender channelSender;
+    private MembershipManager membershipManager;
 
     public TribesClusterManager() {
         parameters = new HashMap();
-        controlCmdProcessor = new TribesControlCommandProcessor(configurationContext);
+        controlCmdProcessor = new ControlCommandProcessor(configurationContext);
     }
 
     public ContextManager getContextManager() {
@@ -83,8 +92,6 @@
 
     public void init() throws ClusteringFault {
 
-        // Until the clustering stuff is properly initialized, we have to block.
-        configurationContext.setProperty(ClusteringConstants.BLOCK_ALL_REQUESTS, "true");
         AxisConfiguration axisConfig = configurationContext.getAxisConfiguration();
         for (Iterator iterator = axisConfig.getInFlowPhases().iterator();
              iterator.hasNext();) {
@@ -122,19 +129,25 @@
                 }
             }
         }
-
-        ChannelSender sender = new ChannelSender();
-
-        channelListener = new ChannelListener(configurationContext,
-                                              configurationManager,
-                                              contextManager,
-                                              controlCmdProcessor,
-                                              sender);
-
-        controlCmdProcessor.setChannelSender(sender);
+        membershipManager = new MembershipManager();
         channel = new GroupChannel();
+        channelSender = new ChannelSender(channel, membershipManager, synchronizeAllMembers());
+        channelListener = new ChannelListener(configurationContext, configurationManager,
+                                              contextManager, controlCmdProcessor);
+
+        // Set the maximum number of retries, if message sending to a particular node fails
+        Parameter maxRetriesParam = getParameter("maxRetries");
+        int maxRetries = 10;
+        if (maxRetriesParam != null) {
+            maxRetries = Integer.parseInt((String) maxRetriesParam.getValue());
+        }
+        ReplicationTransmitter replicationTransmitter =
+                (ReplicationTransmitter) channel.getChannelSender();
+        MultiPointSender multiPointSender = replicationTransmitter.getTransport();
+        multiPointSender.setMaxRetryAttempts(maxRetries);
 
-        String localIP = System.getProperty("local.ip.address"); //TODO: Use ClusteringConstants.LOCAL_IP_ADDRESS
+        // Set the IP address that will be advertised by this node
+        String localIP = System.getProperty(ClusteringConstants.LOCAL_IP_ADDRESS);
         if (localIP != null) {
             ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
             receiver.setAddress(localIP);
@@ -148,6 +161,8 @@
         } else {
             domain = "apache.axis2.domain".getBytes();
         }
+
+        // Add a DomainFilterInterceptor
         channel.getMembershipService().setDomain(domain);
         DomainFilterInterceptor dfi = new DomainFilterInterceptor();
         dfi.setDomain(domain);
@@ -174,13 +189,23 @@
        mcastProps.setProperty("tcpListenPort", "4000");
        mcastProps.setProperty("tcpListenHost", "127.0.0.1");*/
 
-        /*TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
-        tcpFailureDetector.setPrevious(nbc);
+        // Add the OrderInterceptor to preserve sender ordering 
+        OrderInterceptor orderInterceptor = new OrderInterceptor();
+        orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
+        channel.addInterceptor(orderInterceptor);
+
+        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+        channel.addInterceptor(atMostOnceInterceptor);
+
+        // Add a reliable failure detector
+        TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
+        tcpFailureDetector.setPrevious(dfi);
         channel.addInterceptor(tcpFailureDetector);
-        tcpFailureDetector.*/
 
         channel.addChannelListener(channelListener);
-        TribesMembershipListener membershipListener = new TribesMembershipListener();
+
+        TribesMembershipListener membershipListener = new TribesMembershipListener(membershipManager);
         channel.addMembershipListener(membershipListener);
         try {
             channel.start(Channel.DEFAULT);
@@ -189,85 +214,106 @@
                 channel.stop(Channel.DEFAULT);
                 throw new ClusteringFault("Cannot join cluster using IP " + localHost +
                                           ". Please set an IP address other than " +
-                                          localHost + " in your /etc/hosts file and retry.");
+                                          localHost + " in your /etc/hosts file or set the " +
+                                          ClusteringConstants.LOCAL_IP_ADDRESS +
+                                          " System property and retry.");
             }
         } catch (ChannelException e) {
             throw new ClusteringFault("Error starting Tribes channel", e);
         }
-        sender.setChannel(channel);
-
-        Member[] members = channel.getMembers();
-        log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
-        TribesUtil.printMembers(members);
 
-        if (configurationManager != null) { // If configuration management is enabled, get the latest config from a neighbour
-            configurationManager.setSender(sender);
-            getInitializationMessage(members, sender, new GetConfigurationCommand());
-        }
-
-        if (contextManager != null) { // If context replication is enabled, get the latest state from a neighbour
-            contextManager.setSender(sender);
+        // RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
+        // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
+        RpcChannel rpcChannel =
+                new RpcChannel(domain, channel,
+                               new InitializationRequestHandler(controlCmdProcessor));
+
+        log.info("Local Member " + TribesUtil.getLocalHost(channel));
+        TribesUtil.printMembers(membershipManager);
+
+        // If configuration management is enabled, get the latest config from a neighbour
+        if (configurationManager != null) {
+            configurationManager.setSender(channelSender);
+            initializeSystem(rpcChannel, new GetConfigurationCommand());
+        }
+
+        // If context replication is enabled, get the latest state from a neighbour
+        if (contextManager != null) {
+            contextManager.setSender(channelSender);
             channelListener.setContextManager(contextManager);
-            getInitializationMessage(members, sender, new GetStateCommand());
-            ClusteringContextListener contextListener = new ClusteringContextListener(sender);
+            initializeSystem(rpcChannel, new GetStateCommand());
+            ClusteringContextListener contextListener = new ClusteringContextListener(channelSender);
             configurationContext.addContextListener(contextListener);
         }
+
         configurationContext.
                 setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
-        configurationContext.removeProperty(ClusteringConstants.BLOCK_ALL_REQUESTS);
     }
 
     /**
      * Get some information from a neighbour. This information will be used by this node to
      * initialize itself
      *
-     * @param members
-     * @param sender
-     * @param command
+     * @param rpcChannel The utility for sending RPC style messages to the channel
+     * @param command    The control command to send
+     * @throws ClusteringFault If initialization code failed on this node
      */
-    private void getInitializationMessage(Member[] members,
-                                          ChannelSender sender,
-                                          ClusteringCommand command) {
-        // If there is at least one member in the Tribe, get the current initialization info from a member
-        Random random = new Random();
-        int numberOfTries = 0; // Don't keep on trying infinitely
+    private void initializeSystem(RpcChannel rpcChannel, ControlCommand command)
+            throws ClusteringFault {
+        // If there is at least one member in the cluster,
+        //  get the current initialization info from a member
+        int numberOfTries = 0; // Don't keep on trying indefinitely
 
         // Keep track of members to whom we already sent an initialization command
         // Do not send another request to these members
         List sentMembersList = new ArrayList();
-        while (members.length > 0 &&
-               configurationContext.
-                       getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
-               && numberOfTries < 50) {
-
-            // While there are members and GetStateResponseCommand is not received do the following
+        sentMembersList.add(TribesUtil.getLocalHost(channel));
+        Member[] members = membershipManager.getMembers();
+        if(members.length == 0) return;
+
+        while (members.length > 0 && numberOfTries < 5) {
+            Member member = (numberOfTries == 0) ?
+                            membershipManager.getLongestLivingMember() : // First try to get from the longest member alive
+                            membershipManager.getRandomMember(); // Else get from a random member
+            String memberHost = TribesUtil.getHost(member);
             try {
-                members = channel.getMembers();
-                int memberIndex = random.nextInt(members.length);
-                Member member = members[memberIndex];
-                if (!sentMembersList.contains(TribesUtil.getHost(member))) {
-                    long tts = sender.sendToMember(command, member);
-                    configurationContext.
-                            setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
-                                                     new Long(tts));
-                    sentMembersList.add(TribesUtil.getHost(member));
-                    log.debug("WAITING FOR STATE INITIALIZATION MESSAGE...");
-                    Thread.sleep(tts + 5);
+                if (!sentMembersList.contains(memberHost)) {
+                    Response[] responses = rpcChannel.send(new Member[]{member},
+                                                           command,
+                                                           RpcChannel.FIRST_REPLY,
+                                                           Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                                           10000);
+                    if (responses.length > 0) {
+                        ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
+                        break;
+                    }
+                }
+            } catch (ChannelException e) {
+                log.error("Cannot get initialization information from " +
+                          memberHost + ". Will retry in 2 secs.", e);
+                sentMembersList.add(memberHost);
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException ignored) {
+                    log.debug("Interrupted", ignored);
                 }
-            } catch (Exception e) {
-                log.error(e);
-                break;
             }
             numberOfTries++;
+            members = membershipManager.getMembers();
+            if(numberOfTries >= members.length){
+                break;
+            }
         }
     }
 
     public void setConfigurationManager(ConfigurationManager configurationManager) {
         this.configurationManager = (DefaultConfigurationManager) configurationManager;
+        this.configurationManager.setSender(channelSender);
     }
 
     public void setContextManager(ContextManager contextManager) {
         this.contextManager = (DefaultContextManager) contextManager;
+        this.contextManager.setSender(channelSender);
     }
 
     public void addParameter(Parameter param) throws AxisFault {
@@ -291,13 +337,8 @@
     }
 
     public boolean isParameterLocked(String parameterName) {
-
         Parameter parameter = (Parameter) parameters.get(parameterName);
-        if (parameter != null) {
-            return parameter.isLocked();
-        }
-
-        return false;
+        return parameter != null && parameter.isLocked();
     }
 
     public void removeParameter(Parameter param) throws AxisFault {
@@ -308,6 +349,8 @@
         log.debug("Enter: TribesClusterManager::shutdown");
         if (channel != null) {
             try {
+                channel.removeChannelListener(rpcChannel);
+                channel.removeChannelListener(channelListener);
                 channel.stop(Channel.DEFAULT);
             } catch (ChannelException e) {
 
@@ -327,5 +370,24 @@
         if (channelListener != null) {
             channelListener.setConfigurationContext(configurationContext);
         }
+        if (configurationManager != null) {
+            configurationManager.setConfigurationContext(configurationContext);
+        }
+        if (contextManager != null) {
+            contextManager.setConfigurationContext(configurationContext);
+        }
+    }
+
+    /**
+     * Method to check whether all members in the cluster have to be kep in sync at all times.
+     * Typically, this will require each member in the cluster to ACKnowledge receipt of a
+     * particular message, which may have a significant performance hit.
+     *
+     * @return true - if all members in the cluster should be kept in sync at all times, false
+     *         otherwise
+     */
+    public boolean synchronizeAllMembers() {
+        Parameter syncAllParam = getParameter(ClusteringConstants.SYNCHRONIZE_ALL_MEMBERS);
+        return syncAllParam == null || Boolean.parseBoolean((String) syncAllParam.getValue());
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org


Mime
View raw message