cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r799331 [13/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Date Thu, 30 Jul 2009 15:30:27 GMT
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Thu Jul 30 15:30:21 2009
@@ -1,319 +1,319 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.FileOutputStream;
-import java.lang.management.ManagementFactory;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.commons.lang.StringUtils;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.utils.BoundedStatsDeque;
-import org.apache.log4j.Logger;
-
-/**
- * This FailureDetector is an implementation of the paper titled
- * "The Phi Accrual Failure Detector" by Hayashibara. 
- * Check the paper and the <i>IFailureDetector</i> interface for details.
- * 
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public class FailureDetector implements IFailureDetector, FailureDetectorMBean
-{
-    private static Logger logger_ = Logger.getLogger(FailureDetector.class);
-    private static final int sampleSize_ = 1000;
-    private static final int phiSuspectThreshold_ = 5;
-    private static final int phiConvictThreshold_ = 8;
-    /* The Failure Detector has to have been up for at least 1 min. */
-    private static final long uptimeThreshold_ = 60000;
-    private static IFailureDetector failureDetector_;
-    /* Used to lock the factory for creation of FailureDetector instance */
-    private static Lock createLock_ = new ReentrantLock();
-    /* The time when the module was instantiated. */
-    private static long creationTime_;
-    
-    public static IFailureDetector instance()
-    {        
-        if ( failureDetector_ == null )
-        {
-            FailureDetector.createLock_.lock();
-            try
-            {
-                if ( failureDetector_ == null )
-                {
-                    failureDetector_ = new FailureDetector();
-                }
-            }
-            finally
-            {
-                createLock_.unlock();
-            }
-        }        
-        return failureDetector_;
-    }
-    
-    private Map<EndPoint, ArrivalWindow> arrivalSamples_ = new Hashtable<EndPoint, ArrivalWindow>();
-    private List<IFailureDetectionEventListener> fdEvntListeners_ = new ArrayList<IFailureDetectionEventListener>();
-    
-    public FailureDetector()
-    {
-        creationTime_ = System.currentTimeMillis();
-        // Register this instance with JMX
-        try
-        {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.gms:type=FailureDetector"));
-        }
-        catch (Exception e)
-        {
-            logger_.error(LogUtil.throwableToString(e));
-        }
-    }
-    
-    /**
-     * Dump the inter arrival times for examination if necessary.
-     */
-    public void dumpInterArrivalTimes()
-    {
-        try
-        {
-            FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + ".dat", true);
-            fos.write(toString().getBytes());
-            fos.close();
-        }
-        catch(Throwable th)
-        {
-            logger_.warn(LogUtil.throwableToString(th));
-        }
-    }
-    
-    /**
-     * We dump the arrival window for any endpoint only if the 
-     * local Failure Detector module has been up for more than a 
-     * minute.
-     * 
-     * @param ep for which the arrival window needs to be dumped.
-     */
-    private void dumpInterArrivalTimes(EndPoint ep)
-    {
-        long now = System.currentTimeMillis();
-        if ( (now - FailureDetector.creationTime_) <= FailureDetector.uptimeThreshold_ )
-            return;
-        try
-        {
-            FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + "-" + ep + ".dat", true);
-            ArrivalWindow hWnd = arrivalSamples_.get(ep);
-            fos.write(hWnd.toString().getBytes());
-            fos.close();
-        }
-        catch(Throwable th)
-        {
-            logger_.warn(LogUtil.throwableToString(th));
-        }
-    }
-    
-    public boolean isAlive(EndPoint ep)
-    {
-        try
-        {
-            /* If the endpoint in question is the local endpoint return true. */
-            String localHost = FBUtilities.getHostAddress();
-            if ( localHost.equals( ep.getHost() ) )
-                    return true;
-        }
-        catch( UnknownHostException ex )
-        {
-            logger_.info( LogUtil.throwableToString(ex) );
-        }
-    	/* Incoming port is assumed to be the Storage port. We need to change it to the control port */
-    	EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getControlPort());        
-        EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep2);
-        return epState.isAlive();
-    }
-    
-    public void report(EndPoint ep)
-    {
-        if (logger_.isTraceEnabled())
-            logger_.trace("reporting " + ep);
-        long now = System.currentTimeMillis();
-        ArrivalWindow heartbeatWindow = arrivalSamples_.get(ep);
-        if ( heartbeatWindow == null )
-        {
-            heartbeatWindow = new ArrivalWindow(sampleSize_);
-            arrivalSamples_.put(ep, heartbeatWindow);
-        }
-        heartbeatWindow.add(now);
-    }
-    
-    public void interpret(EndPoint ep)
-    {
-        ArrivalWindow hbWnd = arrivalSamples_.get(ep);
-        if ( hbWnd == null )
-        {            
-            return;
-        }
-        long now = System.currentTimeMillis();
-        /* We need this so that we do not suspect a convict. */
-        boolean isConvicted = false;
-        double phi = hbWnd.phi(now);
-        if (logger_.isTraceEnabled())
-            logger_.trace("PHI for " + ep + " : " + phi);
-        
-        /*
-        if ( phi > phiConvictThreshold_ )
-        {            
-            isConvicted = true;     
-            for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
-            {
-                listener.convict(ep);                
-            }
-        }
-        */
-        if ( !isConvicted && phi > phiSuspectThreshold_ )
-        {     
-            for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
-            {
-                listener.suspect(ep);
-            }
-        }        
-    }
-    
-    public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
-    {
-        fdEvntListeners_.add(listener);
-    }
-    
-    public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
-    {
-        fdEvntListeners_.remove(listener);
-    }
-    
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder();
-        Set<EndPoint> eps = arrivalSamples_.keySet();
-        
-        sb.append("-----------------------------------------------------------------------");
-        for ( EndPoint ep : eps )
-        {
-            ArrivalWindow hWnd = arrivalSamples_.get(ep);
-            sb.append(ep + " : ");
-            sb.append(hWnd.toString());
-            sb.append( System.getProperty("line.separator") );
-        }
-        sb.append("-----------------------------------------------------------------------");
-        return sb.toString();
-    }
-    
-    public static void main(String[] args) throws Throwable
-    {           
-    }
-}
-
-class ArrivalWindow
-{
-    private static Logger logger_ = Logger.getLogger(ArrivalWindow.class);
-    private double tLast_ = 0L;
-    private BoundedStatsDeque arrivalIntervals_;
-
-    ArrivalWindow(int size)
-    {
-        arrivalIntervals_ = new BoundedStatsDeque(size);
-    }
-    
-    synchronized void add(double value)
-    {
-        double interArrivalTime;
-        if ( tLast_ > 0L )
-        {                        
-            interArrivalTime = (value - tLast_);            
-        }
-        else
-        {
-            interArrivalTime = Gossiper.intervalInMillis_ / 2;
-        }
-        tLast_ = value;            
-        arrivalIntervals_.add(interArrivalTime);        
-    }
-    
-    synchronized double sum()
-    {
-        return arrivalIntervals_.sum();
-    }
-    
-    synchronized double sumOfDeviations()
-    {
-        return arrivalIntervals_.sumOfDeviations();
-    }
-    
-    synchronized double mean()
-    {
-        return arrivalIntervals_.mean();
-    }
-    
-    synchronized double variance()
-    {
-        return arrivalIntervals_.variance();
-    }
-    
-    double stdev()
-    {
-        return arrivalIntervals_.stdev();
-    }
-    
-    void clear()
-    {
-        arrivalIntervals_.clear();
-    }
-    
-    double p(double t)
-    {
-        double mean = mean();
-        double exponent = (-1)*(t)/mean;
-        return 1 - ( 1 - Math.pow(Math.E, exponent) );
-    }
-    
-    double phi(long tnow)
-    {            
-        int size = arrivalIntervals_.size();
-        double log = 0d;
-        if ( size > 0 )
-        {
-            double t = tnow - tLast_;                
-            double probability = p(t);       
-            log = (-1) * Math.log10( probability );                                 
-        }
-        return log;           
-    } 
-    
-    public String toString()
-    {
-        return StringUtils.join(arrivalIntervals_.iterator(), " ");
-    }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.FileOutputStream;
+import java.lang.management.ManagementFactory;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.BoundedStatsDeque;
+import org.apache.log4j.Logger;
+
+/**
+ * This FailureDetector is an implementation of the paper titled
+ * "The Phi Accrual Failure Detector" by Hayashibara. 
+ * Check the paper and the <i>IFailureDetector</i> interface for details.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FailureDetector implements IFailureDetector, FailureDetectorMBean
+{
+    private static Logger logger_ = Logger.getLogger(FailureDetector.class);
+    private static final int sampleSize_ = 1000;
+    private static final int phiSuspectThreshold_ = 5;
+    private static final int phiConvictThreshold_ = 8;
+    /* The Failure Detector has to have been up for at least 1 min. */
+    private static final long uptimeThreshold_ = 60000;
+    private static IFailureDetector failureDetector_;
+    /* Used to lock the factory for creation of FailureDetector instance */
+    private static Lock createLock_ = new ReentrantLock();
+    /* The time when the module was instantiated. */
+    private static long creationTime_;
+    
+    public static IFailureDetector instance()
+    {        
+        if ( failureDetector_ == null )
+        {
+            FailureDetector.createLock_.lock();
+            try
+            {
+                if ( failureDetector_ == null )
+                {
+                    failureDetector_ = new FailureDetector();
+                }
+            }
+            finally
+            {
+                createLock_.unlock();
+            }
+        }        
+        return failureDetector_;
+    }
+    
+    private Map<EndPoint, ArrivalWindow> arrivalSamples_ = new Hashtable<EndPoint, ArrivalWindow>();
+    private List<IFailureDetectionEventListener> fdEvntListeners_ = new ArrayList<IFailureDetectionEventListener>();
+    
+    public FailureDetector()
+    {
+        creationTime_ = System.currentTimeMillis();
+        // Register this instance with JMX
+        try
+        {
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.gms:type=FailureDetector"));
+        }
+        catch (Exception e)
+        {
+            logger_.error(LogUtil.throwableToString(e));
+        }
+    }
+    
+    /**
+     * Dump the inter arrival times for examination if necessary.
+     */
+    public void dumpInterArrivalTimes()
+    {
+        try
+        {
+            FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + ".dat", true);
+            fos.write(toString().getBytes());
+            fos.close();
+        }
+        catch(Throwable th)
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+        }
+    }
+    
+    /**
+     * We dump the arrival window for any endpoint only if the 
+     * local Failure Detector module has been up for more than a 
+     * minute.
+     * 
+     * @param ep for which the arrival window needs to be dumped.
+     */
+    private void dumpInterArrivalTimes(EndPoint ep)
+    {
+        long now = System.currentTimeMillis();
+        if ( (now - FailureDetector.creationTime_) <= FailureDetector.uptimeThreshold_ )
+            return;
+        try
+        {
+            FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + "-" + ep + ".dat", true);
+            ArrivalWindow hWnd = arrivalSamples_.get(ep);
+            fos.write(hWnd.toString().getBytes());
+            fos.close();
+        }
+        catch(Throwable th)
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+        }
+    }
+    
+    public boolean isAlive(EndPoint ep)
+    {
+        try
+        {
+            /* If the endpoint in question is the local endpoint return true. */
+            String localHost = FBUtilities.getHostAddress();
+            if ( localHost.equals( ep.getHost() ) )
+                    return true;
+        }
+        catch( UnknownHostException ex )
+        {
+            logger_.info( LogUtil.throwableToString(ex) );
+        }
+    	/* Incoming port is assumed to be the Storage port. We need to change it to the control port */
+    	EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getControlPort());        
+        EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep2);
+        return epState.isAlive();
+    }
+    
+    public void report(EndPoint ep)
+    {
+        if (logger_.isTraceEnabled())
+            logger_.trace("reporting " + ep);
+        long now = System.currentTimeMillis();
+        ArrivalWindow heartbeatWindow = arrivalSamples_.get(ep);
+        if ( heartbeatWindow == null )
+        {
+            heartbeatWindow = new ArrivalWindow(sampleSize_);
+            arrivalSamples_.put(ep, heartbeatWindow);
+        }
+        heartbeatWindow.add(now);
+    }
+    
+    public void interpret(EndPoint ep)
+    {
+        ArrivalWindow hbWnd = arrivalSamples_.get(ep);
+        if ( hbWnd == null )
+        {            
+            return;
+        }
+        long now = System.currentTimeMillis();
+        /* We need this so that we do not suspect a convict. */
+        boolean isConvicted = false;
+        double phi = hbWnd.phi(now);
+        if (logger_.isTraceEnabled())
+            logger_.trace("PHI for " + ep + " : " + phi);
+        
+        /*
+        if ( phi > phiConvictThreshold_ )
+        {            
+            isConvicted = true;     
+            for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
+            {
+                listener.convict(ep);                
+            }
+        }
+        */
+        if ( !isConvicted && phi > phiSuspectThreshold_ )
+        {     
+            for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
+            {
+                listener.suspect(ep);
+            }
+        }        
+    }
+    
+    public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
+    {
+        fdEvntListeners_.add(listener);
+    }
+    
+    public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
+    {
+        fdEvntListeners_.remove(listener);
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        Set<EndPoint> eps = arrivalSamples_.keySet();
+        
+        sb.append("-----------------------------------------------------------------------");
+        for ( EndPoint ep : eps )
+        {
+            ArrivalWindow hWnd = arrivalSamples_.get(ep);
+            sb.append(ep + " : ");
+            sb.append(hWnd.toString());
+            sb.append( System.getProperty("line.separator") );
+        }
+        sb.append("-----------------------------------------------------------------------");
+        return sb.toString();
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {           
+    }
+}
+
+class ArrivalWindow
+{
+    private static Logger logger_ = Logger.getLogger(ArrivalWindow.class);
+    private double tLast_ = 0L;
+    private BoundedStatsDeque arrivalIntervals_;
+
+    ArrivalWindow(int size)
+    {
+        arrivalIntervals_ = new BoundedStatsDeque(size);
+    }
+    
+    synchronized void add(double value)
+    {
+        double interArrivalTime;
+        if ( tLast_ > 0L )
+        {                        
+            interArrivalTime = (value - tLast_);            
+        }
+        else
+        {
+            interArrivalTime = Gossiper.intervalInMillis_ / 2;
+        }
+        tLast_ = value;            
+        arrivalIntervals_.add(interArrivalTime);        
+    }
+    
+    synchronized double sum()
+    {
+        return arrivalIntervals_.sum();
+    }
+    
+    synchronized double sumOfDeviations()
+    {
+        return arrivalIntervals_.sumOfDeviations();
+    }
+    
+    synchronized double mean()
+    {
+        return arrivalIntervals_.mean();
+    }
+    
+    synchronized double variance()
+    {
+        return arrivalIntervals_.variance();
+    }
+    
+    double stdev()
+    {
+        return arrivalIntervals_.stdev();
+    }
+    
+    void clear()
+    {
+        arrivalIntervals_.clear();
+    }
+    
+    double p(double t)
+    {
+        double mean = mean();
+        double exponent = (-1)*(t)/mean;
+        return 1 - ( 1 - Math.pow(Math.E, exponent) );
+    }
+    
+    double phi(long tnow)
+    {            
+        int size = arrivalIntervals_.size();
+        double log = 0d;
+        if ( size > 0 )
+        {
+            double t = tnow - tLast_;                
+            double probability = p(t);       
+            log = (-1) * Math.log10( probability );                                 
+        }
+        return log;           
+    } 
+    
+    public String toString()
+    {
+        return StringUtils.join(arrivalIntervals_.iterator(), " ");
+    }
+}
+

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java Thu Jul 30 15:30:21 2009
@@ -1,24 +1,24 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-public interface FailureDetectorMBean
-{
-    public void dumpInterArrivalTimes();
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+public interface FailureDetectorMBean
+{
+    public void dumpInterArrivalTimes();
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java Thu Jul 30 15:30:21 2009
@@ -1,110 +1,110 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.*;
-
-/**
- * Contains information about a specified list of EndPoints and the largest version 
- * of the state they have generated as known by the local endpoint.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class GossipDigest implements Comparable<GossipDigest>
-{
-    private static ICompactSerializer<GossipDigest> serializer_;
-    static
-    {
-        serializer_ = new GossipDigestSerializer();
-    }
-    
-    EndPoint endPoint_;
-    int generation_;
-    int maxVersion_;
-
-    public static ICompactSerializer<GossipDigest> serializer()
-    {
-        return serializer_;
-    }
-    
-    GossipDigest(EndPoint endPoint, int generation, int maxVersion)
-    {
-        endPoint_ = endPoint;
-        generation_ = generation; 
-        maxVersion_ = maxVersion;
-    }
-    
-    EndPoint getEndPoint()
-    {
-        return endPoint_;
-    }
-    
-    int getGeneration()
-    {
-        return generation_;
-    }
-    
-    int getMaxVersion()
-    {
-        return maxVersion_;
-    }
-    
-    public int compareTo(GossipDigest gDigest)
-    {
-        if ( generation_ != gDigest.generation_ )
-            return ( generation_ - gDigest.generation_ );
-        return (maxVersion_ - gDigest.maxVersion_);
-    }
-    
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append(endPoint_);
-        sb.append(":");
-        sb.append(generation_);
-        sb.append(":");
-        sb.append(maxVersion_);
-        return sb.toString();
-    }
-}
-
-class GossipDigestSerializer implements ICompactSerializer<GossipDigest>
-{       
-    public void serialize(GossipDigest gDigest, DataOutputStream dos) throws IOException
-    {        
-        CompactEndPointSerializationHelper.serialize(gDigest.endPoint_, dos);
-        dos.writeInt(gDigest.generation_);
-        dos.writeInt(gDigest.maxVersion_);
-    }
-
-    public GossipDigest deserialize(DataInputStream dis) throws IOException
-    {
-        EndPoint endPoint = CompactEndPointSerializationHelper.deserialize(dis);
-        int generation = dis.readInt();
-        int version = dis.readInt();
-        return new GossipDigest(endPoint, generation, version);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.*;
+
+/**
+ * Contains information about a specified list of EndPoints and the largest version 
+ * of the state they have generated as known by the local endpoint.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class GossipDigest implements Comparable<GossipDigest>
+{
+    private static ICompactSerializer<GossipDigest> serializer_;
+    static
+    {
+        serializer_ = new GossipDigestSerializer();
+    }
+    
+    EndPoint endPoint_;
+    int generation_;
+    int maxVersion_;
+
+    public static ICompactSerializer<GossipDigest> serializer()
+    {
+        return serializer_;
+    }
+    
+    GossipDigest(EndPoint endPoint, int generation, int maxVersion)
+    {
+        endPoint_ = endPoint;
+        generation_ = generation; 
+        maxVersion_ = maxVersion;
+    }
+    
+    EndPoint getEndPoint()
+    {
+        return endPoint_;
+    }
+    
+    int getGeneration()
+    {
+        return generation_;
+    }
+    
+    int getMaxVersion()
+    {
+        return maxVersion_;
+    }
+    
+    public int compareTo(GossipDigest gDigest)
+    {
+        if ( generation_ != gDigest.generation_ )
+            return ( generation_ - gDigest.generation_ );
+        return (maxVersion_ - gDigest.maxVersion_);
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append(endPoint_);
+        sb.append(":");
+        sb.append(generation_);
+        sb.append(":");
+        sb.append(maxVersion_);
+        return sb.toString();
+    }
+}
+
+class GossipDigestSerializer implements ICompactSerializer<GossipDigest>
+{       
+    public void serialize(GossipDigest gDigest, DataOutputStream dos) throws IOException
+    {        
+        CompactEndPointSerializationHelper.serialize(gDigest.endPoint_, dos);
+        dos.writeInt(gDigest.generation_);
+        dos.writeInt(gDigest.maxVersion_);
+    }
+
+    public GossipDigest deserialize(DataInputStream dis) throws IOException
+    {
+        EndPoint endPoint = CompactEndPointSerializationHelper.deserialize(dis);
+        int generation = dis.readInt();
+        int version = dis.readInt();
+        return new GossipDigest(endPoint, generation, version);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java Thu Jul 30 15:30:21 2009
@@ -1,77 +1,77 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.*;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.*;
-
-
-/**
- * This message gets sent out as a result of the receipt of a GossipDigestAckMessage. This the 
- * last stage of the 3 way messaging of the Gossip protocol.
- *  
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class GossipDigestAck2Message
-{
-    private static  ICompactSerializer<GossipDigestAck2Message> serializer_;
-    static
-    {
-        serializer_ = new GossipDigestAck2MessageSerializer();
-    }
-    
-    Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
-
-    public static ICompactSerializer<GossipDigestAck2Message> serializer()
-    {
-        return serializer_;
-    }
-    
-    GossipDigestAck2Message(Map<EndPoint, EndPointState> epStateMap)
-    {
-        epStateMap_ = epStateMap;
-    }
-        
-    Map<EndPoint, EndPointState> getEndPointStateMap()
-    {
-         return epStateMap_;
-    }
-}
-
-class GossipDigestAck2MessageSerializer implements ICompactSerializer<GossipDigestAck2Message>
-{
-    public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos) throws IOException
-    {
-        /* Use the EndPointState */
-        EndPointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos);
-    }
-
-    public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException
-    {
-        Map<EndPoint, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
-        return new GossipDigestAck2Message(epStateMap);        
-    }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.*;
+
+
+/**
+ * This message gets sent out as a result of the receipt of a GossipDigestAckMessage. This the 
+ * last stage of the 3 way messaging of the Gossip protocol.
+ *  
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestAck2Message
+{
+    private static  ICompactSerializer<GossipDigestAck2Message> serializer_;
+    static
+    {
+        serializer_ = new GossipDigestAck2MessageSerializer();
+    }
+    
+    Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+
+    public static ICompactSerializer<GossipDigestAck2Message> serializer()
+    {
+        return serializer_;
+    }
+    
+    GossipDigestAck2Message(Map<EndPoint, EndPointState> epStateMap)
+    {
+        epStateMap_ = epStateMap;
+    }
+        
+    Map<EndPoint, EndPointState> getEndPointStateMap()
+    {
+         return epStateMap_;
+    }
+}
+
+class GossipDigestAck2MessageSerializer implements ICompactSerializer<GossipDigestAck2Message>
+{
+    public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos) throws IOException
+    {
+        /* Use the EndPointState */
+        EndPointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos);
+    }
+
+    public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException
+    {
+        Map<EndPoint, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
+        return new GossipDigestAck2Message(epStateMap);        
+    }
+}
+

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java Thu Jul 30 15:30:21 2009
@@ -1,102 +1,102 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
-
-
-
-/**
- * This message gets sent out as a result of the receipt of a GossipDigestSynMessage by an
- * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
- * 
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class GossipDigestAckMessage
-{
-    private static ICompactSerializer<GossipDigestAckMessage> serializer_;
-    static
-    {
-        serializer_ = new GossipDigestAckMessageSerializer();
-    }
-    
-    List<GossipDigest> gDigestList_ = new ArrayList<GossipDigest>();
-    Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
-    
-    static ICompactSerializer<GossipDigestAckMessage> serializer()
-    {
-        return serializer_;
-    }
-    
-    GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<EndPoint, EndPointState> epStateMap)
-    {
-        gDigestList_ = gDigestList;
-        epStateMap_ = epStateMap;
-    }
-    
-    void addGossipDigest(EndPoint ep, int generation, int version)
-    {
-        gDigestList_.add( new GossipDigest(ep, generation, version) );
-    }
-    
-    List<GossipDigest> getGossipDigestList()
-    {
-        return gDigestList_;
-    }
-    
-    Map<EndPoint, EndPointState> getEndPointStateMap()
-    {
-        return epStateMap_;
-    }
-}
-
-class GossipDigestAckMessageSerializer implements ICompactSerializer<GossipDigestAckMessage>
-{
-    public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException
-    {
-        /* Use the helper to serialize the GossipDigestList */
-        boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
-        dos.writeBoolean(bContinue);
-        /* Use the EndPointState */
-        if ( bContinue )
-        {
-            EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);            
-        }
-    }
-
-    public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
-    {
-        Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
-        List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);                
-        boolean bContinue = dis.readBoolean();
-
-        if ( bContinue )
-        {
-            epStateMap = EndPointStatesSerializationHelper.deserialize(dis);                                    
-        }
-        return new GossipDigestAckMessage(gDigestList, epStateMap);
-    }
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+
+
+
+/**
+ * This message gets sent out as a result of the receipt of a GossipDigestSynMessage by an
+ * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestAckMessage
+{
+    private static ICompactSerializer<GossipDigestAckMessage> serializer_;
+    static
+    {
+        serializer_ = new GossipDigestAckMessageSerializer();
+    }
+    
+    List<GossipDigest> gDigestList_ = new ArrayList<GossipDigest>();
+    Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+    
+    static ICompactSerializer<GossipDigestAckMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+    GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<EndPoint, EndPointState> epStateMap)
+    {
+        gDigestList_ = gDigestList;
+        epStateMap_ = epStateMap;
+    }
+    
+    void addGossipDigest(EndPoint ep, int generation, int version)
+    {
+        gDigestList_.add( new GossipDigest(ep, generation, version) );
+    }
+    
+    List<GossipDigest> getGossipDigestList()
+    {
+        return gDigestList_;
+    }
+    
+    Map<EndPoint, EndPointState> getEndPointStateMap()
+    {
+        return epStateMap_;
+    }
+}
+
+class GossipDigestAckMessageSerializer implements ICompactSerializer<GossipDigestAckMessage>
+{
+    public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException
+    {
+        /* Use the helper to serialize the GossipDigestList */
+        boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
+        dos.writeBoolean(bContinue);
+        /* Use the EndPointState */
+        if ( bContinue )
+        {
+            EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);            
+        }
+    }
+
+    public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
+    {
+        Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+        List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);                
+        boolean bContinue = dis.readBoolean();
+
+        if ( bContinue )
+        {
+            epStateMap = EndPointStatesSerializationHelper.deserialize(dis);                                    
+        }
+        return new GossipDigestAckMessage(gDigestList, epStateMap);
+    }
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java Thu Jul 30 15:30:21 2009
@@ -1,184 +1,184 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.utils.Log4jLogger;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.utils.*;
-
-
-/**
- * This is the first message that gets sent out as a start of the Gossip protocol in a 
- * round. 
- * 
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class GossipDigestSynMessage
-{
-    private static ICompactSerializer<GossipDigestSynMessage> serializer_;
-    static
-    {
-        serializer_ = new GossipDigestSynMessageSerializer();
-    }
-    
-    String clusterId_;
-    List<GossipDigest> gDigests_ = new ArrayList<GossipDigest>();
-
-    public static ICompactSerializer<GossipDigestSynMessage> serializer()
-    {
-        return serializer_;
-    }
- 
-    public GossipDigestSynMessage(String clusterId, List<GossipDigest> gDigests)
-    {      
-        clusterId_ = clusterId;
-        gDigests_ = gDigests;
-    }
-    
-    List<GossipDigest> getGossipDigests()
-    {
-        return gDigests_;
-    }
-}
-
-class GossipDigestSerializationHelper
-{
-    private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class);
-    
-    static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
-    {
-        boolean bVal = true;
-        int size = gDigestList.size();                        
-        dos.writeInt(size);
-        
-        int estimate = 0;            
-        for ( GossipDigest gDigest : gDigestList )
-        {
-            if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
-            {
-                logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@");
-                bVal = false;
-                break;
-            }
-            int pre = dos.size();               
-            GossipDigest.serializer().serialize( gDigest, dos );
-            int post = dos.size();
-            estimate = post - pre;
-        }
-        return bVal;
-    }
-
-    static List<GossipDigest> deserialize(DataInputStream dis) throws IOException
-    {
-        int size = dis.readInt();            
-        List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
-        
-        for ( int i = 0; i < size; ++i )
-        {
-            if ( dis.available() == 0 )
-            {
-                logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests.");
-                break;
-            }
-                            
-            GossipDigest gDigest = GossipDigest.serializer().deserialize(dis);                
-            gDigests.add( gDigest );                
-        }        
-        return gDigests;
-    }
-}
-
-class EndPointStatesSerializationHelper
-{
-    private static Log4jLogger logger_ = new Log4jLogger(EndPointStatesSerializationHelper.class.getName());
-    
-    static boolean serialize(Map<EndPoint, EndPointState> epStateMap, DataOutputStream dos) throws IOException
-    {
-        boolean bVal = true;
-        int estimate = 0;                
-        int size = epStateMap.size();
-        dos.writeInt(size);
-    
-        Set<EndPoint> eps = epStateMap.keySet();
-        for( EndPoint ep : eps )
-        {
-            if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
-            {
-                logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@");
-                bVal = false;
-                break;
-            }
-    
-            int pre = dos.size();
-            CompactEndPointSerializationHelper.serialize(ep, dos);
-            EndPointState epState = epStateMap.get(ep);            
-            EndPointState.serializer().serialize(epState, dos);
-            int post = dos.size();
-            estimate = post - pre;
-        }
-        return bVal;
-    }
-
-    static Map<EndPoint, EndPointState> deserialize(DataInputStream dis) throws IOException
-    {
-        int size = dis.readInt();            
-        Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
-        
-        for ( int i = 0; i < size; ++i )
-        {
-            if ( dis.available() == 0 )
-            {
-                logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState.");
-                break;
-            }
-            // int length = dis.readInt();            
-            EndPoint ep = CompactEndPointSerializationHelper.deserialize(dis);
-            EndPointState epState = EndPointState.serializer().deserialize(dis);            
-            epStateMap.put(ep, epState);
-        }        
-        return epStateMap;
-    }
-}
-
-class GossipDigestSynMessageSerializer implements ICompactSerializer<GossipDigestSynMessage>
-{   
-    public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos) throws IOException
-    {    
-        dos.writeUTF(gDigestSynMessage.clusterId_);
-        GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos);
-    }
-
-    public GossipDigestSynMessage deserialize(DataInputStream dis) throws IOException
-    {
-        String clusterId = dis.readUTF();
-        List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis);
-        return new GossipDigestSynMessage(clusterId, gDigests);
-    }
-
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.Log4jLogger;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+
+/**
+ * This is the first message that gets sent out as a start of the Gossip protocol in a 
+ * round. 
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestSynMessage
+{
+    private static ICompactSerializer<GossipDigestSynMessage> serializer_;
+    static
+    {
+        serializer_ = new GossipDigestSynMessageSerializer();
+    }
+    
+    String clusterId_;
+    List<GossipDigest> gDigests_ = new ArrayList<GossipDigest>();
+
+    public static ICompactSerializer<GossipDigestSynMessage> serializer()
+    {
+        return serializer_;
+    }
+ 
+    public GossipDigestSynMessage(String clusterId, List<GossipDigest> gDigests)
+    {      
+        clusterId_ = clusterId;
+        gDigests_ = gDigests;
+    }
+    
+    List<GossipDigest> getGossipDigests()
+    {
+        return gDigests_;
+    }
+}
+
+class GossipDigestSerializationHelper
+{
+    private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class);
+    
+    static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
+    {
+        boolean bVal = true;
+        int size = gDigestList.size();                        
+        dos.writeInt(size);
+        
+        int estimate = 0;            
+        for ( GossipDigest gDigest : gDigestList )
+        {
+            if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+            {
+                logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@");
+                bVal = false;
+                break;
+            }
+            int pre = dos.size();               
+            GossipDigest.serializer().serialize( gDigest, dos );
+            int post = dos.size();
+            estimate = post - pre;
+        }
+        return bVal;
+    }
+
+    static List<GossipDigest> deserialize(DataInputStream dis) throws IOException
+    {
+        int size = dis.readInt();            
+        List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+        
+        for ( int i = 0; i < size; ++i )
+        {
+            if ( dis.available() == 0 )
+            {
+                logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests.");
+                break;
+            }
+                            
+            GossipDigest gDigest = GossipDigest.serializer().deserialize(dis);                
+            gDigests.add( gDigest );                
+        }        
+        return gDigests;
+    }
+}
+
+class EndPointStatesSerializationHelper
+{
+    private static Log4jLogger logger_ = new Log4jLogger(EndPointStatesSerializationHelper.class.getName());
+    
+    static boolean serialize(Map<EndPoint, EndPointState> epStateMap, DataOutputStream dos) throws IOException
+    {
+        boolean bVal = true;
+        int estimate = 0;                
+        int size = epStateMap.size();
+        dos.writeInt(size);
+    
+        Set<EndPoint> eps = epStateMap.keySet();
+        for( EndPoint ep : eps )
+        {
+            if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+            {
+                logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@");
+                bVal = false;
+                break;
+            }
+    
+            int pre = dos.size();
+            CompactEndPointSerializationHelper.serialize(ep, dos);
+            EndPointState epState = epStateMap.get(ep);            
+            EndPointState.serializer().serialize(epState, dos);
+            int post = dos.size();
+            estimate = post - pre;
+        }
+        return bVal;
+    }
+
+    static Map<EndPoint, EndPointState> deserialize(DataInputStream dis) throws IOException
+    {
+        int size = dis.readInt();            
+        Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+        
+        for ( int i = 0; i < size; ++i )
+        {
+            if ( dis.available() == 0 )
+            {
+                logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState.");
+                break;
+            }
+            // int length = dis.readInt();            
+            EndPoint ep = CompactEndPointSerializationHelper.deserialize(dis);
+            EndPointState epState = EndPointState.serializer().deserialize(dis);            
+            epStateMap.put(ep, epState);
+        }        
+        return epStateMap;
+    }
+}
+
+class GossipDigestSynMessageSerializer implements ICompactSerializer<GossipDigestSynMessage>
+{   
+    public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos) throws IOException
+    {    
+        dos.writeUTF(gDigestSynMessage.clusterId_);
+        GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos);
+    }
+
+    public GossipDigestSynMessage deserialize(DataInputStream dis) throws IOException
+    {
+        String clusterId = dis.readUTF();
+        List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis);
+        return new GossipDigestSynMessage(clusterId, gDigests);
+    }
+
+}
+



Mime
View raw message