cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r799331 [11/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Date Thu, 30 Jul 2009 15:30:27 GMT
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java Thu Jul 30 15:30:21 2009
@@ -1,99 +1,99 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.net.io.*;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class BootstrapInitiateMessage implements Serializable
-{
-    private static ICompactSerializer<BootstrapInitiateMessage> serializer_;
-    
-    static
-    {
-        serializer_ = new BootstrapInitiateMessageSerializer();
-    }
-    
-    public static ICompactSerializer<BootstrapInitiateMessage> serializer()
-    {
-        return serializer_;
-    }
-    
-    public static Message makeBootstrapInitiateMessage(BootstrapInitiateMessage biMessage) throws IOException
-    {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream( bos );
-        BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
-        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
-    }
-    
-    protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
-   
-    public BootstrapInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
-    {
-        streamContexts_ = streamContexts;
-    }
-    
-    public StreamContextManager.StreamContext[] getStreamContext()
-    {
-        return streamContexts_;
-    }
-}
-
-class BootstrapInitiateMessageSerializer implements ICompactSerializer<BootstrapInitiateMessage>
-{
-    public void serialize(BootstrapInitiateMessage bim, DataOutputStream dos) throws IOException
-    {
-        dos.writeInt(bim.streamContexts_.length);
-        for ( StreamContextManager.StreamContext streamContext : bim.streamContexts_ )
-        {
-            StreamContextManager.StreamContext.serializer().serialize(streamContext, dos);
-        }
-    }
-    
-    public BootstrapInitiateMessage deserialize(DataInputStream dis) throws IOException
-    {
-        int size = dis.readInt();
-        StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[0];
-        if ( size > 0 )
-        {
-            streamContexts = new StreamContextManager.StreamContext[size];
-            for ( int i = 0; i < size; ++i )
-            {
-                streamContexts[i] = StreamContextManager.StreamContext.serializer().deserialize(dis);
-            }
-        }
-        return new BootstrapInitiateMessage(streamContexts);
-    }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.net.io.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class BootstrapInitiateMessage implements Serializable
+{
+    private static ICompactSerializer<BootstrapInitiateMessage> serializer_;
+    
+    static
+    {
+        serializer_ = new BootstrapInitiateMessageSerializer();
+    }
+    
+    public static ICompactSerializer<BootstrapInitiateMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+    public static Message makeBootstrapInitiateMessage(BootstrapInitiateMessage biMessage) throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
+        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
+    }
+    
+    protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
+   
+    public BootstrapInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
+    {
+        streamContexts_ = streamContexts;
+    }
+    
+    public StreamContextManager.StreamContext[] getStreamContext()
+    {
+        return streamContexts_;
+    }
+}
+
+class BootstrapInitiateMessageSerializer implements ICompactSerializer<BootstrapInitiateMessage>
+{
+    public void serialize(BootstrapInitiateMessage bim, DataOutputStream dos) throws IOException
+    {
+        dos.writeInt(bim.streamContexts_.length);
+        for ( StreamContextManager.StreamContext streamContext : bim.streamContexts_ )
+        {
+            StreamContextManager.StreamContext.serializer().serialize(streamContext, dos);
+        }
+    }
+    
+    public BootstrapInitiateMessage deserialize(DataInputStream dis) throws IOException
+    {
+        int size = dis.readInt();
+        StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[0];
+        if ( size > 0 )
+        {
+            streamContexts = new StreamContextManager.StreamContext[size];
+            for ( int i = 0; i < size; ++i )
+            {
+                streamContexts[i] = StreamContextManager.StreamContext.serializer().deserialize(dis);
+            }
+        }
+        return new BootstrapInitiateMessage(streamContexts);
+    }
+}
+

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java Thu Jul 30 15:30:21 2009
@@ -1,102 +1,102 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
-
-
-
-/**
- * This encapsulates information of the list of 
- * ranges that a target node requires in order to 
- * be bootstrapped. This will be bundled in a 
- * BootstrapMetadataMessage and sent to nodes that
- * are going to handoff the data.
-*/
-class BootstrapMetadata
-{
-    private static ICompactSerializer<BootstrapMetadata> serializer_;
-    static
-    {
-        serializer_ = new BootstrapMetadataSerializer();
-    }
-    
-    protected static ICompactSerializer<BootstrapMetadata> serializer()
-    {
-        return serializer_;
-    }
-    
-    protected EndPoint target_;
-    protected List<Range> ranges_;
-    
-    BootstrapMetadata(EndPoint target, List<Range> ranges)
-    {
-        target_ = target;
-        ranges_ = ranges;
-    }
-    
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder("");
-        sb.append(target_);
-        sb.append("------->");
-        for ( Range range : ranges_ )
-        {
-            sb.append(range);
-            sb.append(" ");
-        }
-        return sb.toString();
-    }
-}
-
-class BootstrapMetadataSerializer implements ICompactSerializer<BootstrapMetadata>
-{
-    public void serialize(BootstrapMetadata bsMetadata, DataOutputStream dos) throws IOException
-    {
-        CompactEndPointSerializationHelper.serialize(bsMetadata.target_, dos);
-        int size = (bsMetadata.ranges_ == null) ? 0 : bsMetadata.ranges_.size();            
-        dos.writeInt(size);
-        
-        for ( Range range : bsMetadata.ranges_ )
-        {
-            Range.serializer().serialize(range, dos);
-        }            
-    }
-
-    public BootstrapMetadata deserialize(DataInputStream dis) throws IOException
-    {            
-        EndPoint target = CompactEndPointSerializationHelper.deserialize(dis);
-        int size = dis.readInt();
-        List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
-        for( int i = 0; i < size; ++i )
-        {
-            ranges.add(Range.serializer().deserialize(dis));
-        }            
-        return new BootstrapMetadata( target, ranges );
-    }
-}
-
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+
+
+
+/**
+ * This encapsulates information of the list of 
+ * ranges that a target node requires in order to 
+ * be bootstrapped. This will be bundled in a 
+ * BootstrapMetadataMessage and sent to nodes that
+ * are going to handoff the data.
+*/
+class BootstrapMetadata
+{
+    private static ICompactSerializer<BootstrapMetadata> serializer_;
+    static
+    {
+        serializer_ = new BootstrapMetadataSerializer();
+    }
+    
+    protected static ICompactSerializer<BootstrapMetadata> serializer()
+    {
+        return serializer_;
+    }
+    
+    protected EndPoint target_;
+    protected List<Range> ranges_;
+    
+    BootstrapMetadata(EndPoint target, List<Range> ranges)
+    {
+        target_ = target;
+        ranges_ = ranges;
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("");
+        sb.append(target_);
+        sb.append("------->");
+        for ( Range range : ranges_ )
+        {
+            sb.append(range);
+            sb.append(" ");
+        }
+        return sb.toString();
+    }
+}
+
+class BootstrapMetadataSerializer implements ICompactSerializer<BootstrapMetadata>
+{
+    public void serialize(BootstrapMetadata bsMetadata, DataOutputStream dos) throws IOException
+    {
+        CompactEndPointSerializationHelper.serialize(bsMetadata.target_, dos);
+        int size = (bsMetadata.ranges_ == null) ? 0 : bsMetadata.ranges_.size();            
+        dos.writeInt(size);
+        
+        for ( Range range : bsMetadata.ranges_ )
+        {
+            Range.serializer().serialize(range, dos);
+        }            
+    }
+
+    public BootstrapMetadata deserialize(DataInputStream dis) throws IOException
+    {            
+        EndPoint target = CompactEndPointSerializationHelper.deserialize(dis);
+        int size = dis.readInt();
+        List<Range> ranges = (size == 0) ? null : new ArrayList<Range>();
+        for( int i = 0; i < size; ++i )
+        {
+            ranges.add(Range.serializer().deserialize(dis));
+        }            
+        return new BootstrapMetadata( target, ranges );
+    }
+}
+

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Thu Jul 30 15:30:21 2009
@@ -1,90 +1,90 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-
-
-
-/**
- * This class encapsulates the message that needs to be sent
- * to nodes that handoff data. The message contains information
- * about the node to be bootstrapped and the ranges with which
- * it needs to be bootstrapped.
-*/
-class BootstrapMetadataMessage
-{
-    private static ICompactSerializer<BootstrapMetadataMessage> serializer_;
-    static
-    {
-        serializer_ = new BootstrapMetadataMessageSerializer();
-    }
-    
-    protected static ICompactSerializer<BootstrapMetadataMessage> serializer()
-    {
-        return serializer_;
-    }
-    
-    protected static Message makeBootstrapMetadataMessage(BootstrapMetadataMessage bsMetadataMessage) throws IOException
-    {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream( bos );
-        BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
-        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, bos.toByteArray() );
-    }        
-    
-    protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
-    
-    BootstrapMetadataMessage(BootstrapMetadata[] bsMetadata)
-    {
-        bsMetadata_ = bsMetadata;
-    }
-}
-
-class BootstrapMetadataMessageSerializer implements ICompactSerializer<BootstrapMetadataMessage>
-{
-    public void serialize(BootstrapMetadataMessage bsMetadataMessage, DataOutputStream dos) throws IOException
-    {
-        BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
-        int size = (bsMetadata == null) ? 0 : bsMetadata.length;
-        dos.writeInt(size);
-        for ( BootstrapMetadata bsmd : bsMetadata )
-        {
-            BootstrapMetadata.serializer().serialize(bsmd, dos);
-        }
-    }
-
-    public BootstrapMetadataMessage deserialize(DataInputStream dis) throws IOException
-    {            
-        int size = dis.readInt();
-        BootstrapMetadata[] bsMetadata = new BootstrapMetadata[size];
-        for ( int i = 0; i < size; ++i )
-        {
-            bsMetadata[i] = BootstrapMetadata.serializer().deserialize(dis);
-        }
-        return new BootstrapMetadataMessage(bsMetadata);
-    }
-}
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+
+
+
+/**
+ * This class encapsulates the message that needs to be sent
+ * to nodes that handoff data. The message contains information
+ * about the node to be bootstrapped and the ranges with which
+ * it needs to be bootstrapped.
+*/
+class BootstrapMetadataMessage
+{
+    private static ICompactSerializer<BootstrapMetadataMessage> serializer_;
+    static
+    {
+        serializer_ = new BootstrapMetadataMessageSerializer();
+    }
+    
+    protected static ICompactSerializer<BootstrapMetadataMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+    protected static Message makeBootstrapMetadataMessage(BootstrapMetadataMessage bsMetadataMessage) throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream( bos );
+        BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
+        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, bos.toByteArray() );
+    }        
+    
+    protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
+    
+    BootstrapMetadataMessage(BootstrapMetadata[] bsMetadata)
+    {
+        bsMetadata_ = bsMetadata;
+    }
+}
+
+class BootstrapMetadataMessageSerializer implements ICompactSerializer<BootstrapMetadataMessage>
+{
+    public void serialize(BootstrapMetadataMessage bsMetadataMessage, DataOutputStream dos) throws IOException
+    {
+        BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
+        int size = (bsMetadata == null) ? 0 : bsMetadata.length;
+        dos.writeInt(size);
+        for ( BootstrapMetadata bsmd : bsMetadata )
+        {
+            BootstrapMetadata.serializer().serialize(bsmd, dos);
+        }
+    }
+
+    public BootstrapMetadataMessage deserialize(DataInputStream dis) throws IOException
+    {            
+        int size = dis.readInt();
+        BootstrapMetadata[] bsMetadata = new BootstrapMetadata[size];
+        for ( int i = 0; i < size; ++i )
+        {
+            bsMetadata[i] = BootstrapMetadata.serializer().deserialize(dis);
+        }
+        return new BootstrapMetadataMessage(bsMetadata);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,173 +1,173 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.service.StreamManager;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * This verb handler handles the BootstrapMetadataMessage that is sent
- * by the leader to the nodes that are responsible for handing off data. 
-*/
-public class BootstrapMetadataVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger(BootstrapMetadataVerbHandler.class);
-    
-    public void doVerb(Message message)
-    {
-        if (logger_.isDebugEnabled())
-          logger_.debug("Received a BootstrapMetadataMessage from " + message.getFrom());
-        byte[] body = message.getMessageBody();
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(body, body.length);
-        try
-        {
-            BootstrapMetadataMessage bsMetadataMessage = BootstrapMetadataMessage.serializer().deserialize(bufIn);
-            BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
-            
-            /*
-             * This is for debugging purposes. Remove later.
-            */
-            for ( BootstrapMetadata bsmd : bsMetadata )
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug(bsmd.toString());                                      
-            }
-            
-            for ( BootstrapMetadata bsmd : bsMetadata )
-            {
-                long startTime = System.currentTimeMillis();
-                doTransfer(bsmd.target_, bsmd.ranges_);     
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Time taken to boostrap " + 
-                        bsmd.target_ + 
-                        " is " + 
-                        (System.currentTimeMillis() - startTime) +
-                        " msecs.");
-            }
-        }
-        catch ( IOException ex )
-        {
-            logger_.info(LogUtil.throwableToString(ex));
-        }
-    }
-    
-    /*
-     * This method needs to figure out the files on disk
-     * locally for each range and then stream them using
-     * the Bootstrap protocol to the target endpoint.
-    */
-    private void doTransfer(EndPoint target, List<Range> ranges) throws IOException
-    {
-        if ( ranges.size() == 0 )
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("No ranges to give scram ...");
-            return;
-        }
-        
-        /* Just for debugging process - remove later */            
-        for ( Range range : ranges )
-        {
-            StringBuilder sb = new StringBuilder("");                
-            sb.append(range.toString());
-            sb.append(" ");            
-            if (logger_.isDebugEnabled())
-              logger_.debug("Beginning transfer process to " + target + " for ranges " + sb.toString());                
-        }
-      
-        /*
-         * (1) First we dump all the memtables to disk.
-         * (2) Run a version of compaction which will basically
-         *     put the keys in the range specified into a directory
-         *     named as per the endpoint it is destined for inside the
-         *     bootstrap directory.
-         * (3) Handoff the data.
-        */
-        List<String> tables = DatabaseDescriptor.getTables();
-        for ( String tName : tables )
-        {
-            Table table = Table.open(tName);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Flushing memtables ...");
-            table.flush(false);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Forcing compaction ...");
-            /* Get the counting bloom filter for each endpoint and the list of files that need to be streamed */
-            List<String> fileList = new ArrayList<String>();
-            boolean bVal = table.forceCompaction(ranges, target, fileList);                
-            doHandoff(target, fileList, tName);
-        }
-    }
-
-    /**
-     * Stream the files in the bootstrap directory over to the
-     * node being bootstrapped.
-    */
-    private void doHandoff(EndPoint target, List<String> fileList, String table) throws IOException
-    {
-        List<File> filesList = new ArrayList<File>();
-        for(String file : fileList)
-        {
-            filesList.add(new File(file));
-        }
-        File[] files = filesList.toArray(new File[0]);
-        StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[files.length];
-        int i = 0;
-        for ( File file : files )
-        {
-            streamContexts[i] = new StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), table);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Stream context metadata " + streamContexts[i]);
-            ++i;
-        }
-        
-        if ( files.length > 0 )
-        {
-            /* Set up the stream manager with the files that need to streamed */
-            StreamManager.instance(target).addFilesToStream(streamContexts);
-            /* Send the bootstrap initiate message */
-            BootstrapInitiateMessage biMessage = new BootstrapInitiateMessage(streamContexts);
-            Message message = BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Sending a bootstrap initiate message to " + target + " ...");
-            MessagingService.getMessagingInstance().sendOneWay(message, target);                
-            if (logger_.isDebugEnabled())
-              logger_.debug("Waiting for transfer to " + target + " to complete");
-            StreamManager.instance(target).waitForStreamCompletion();
-            if (logger_.isDebugEnabled())
-              logger_.debug("Done with transfer to " + target);  
-        }
-    }
-}
-
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StreamManager;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * This verb handler handles the BootstrapMetadataMessage that is sent
+ * by the leader to the nodes that are responsible for handing off data. 
+*/
+public class BootstrapMetadataVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(BootstrapMetadataVerbHandler.class);
+    
+    public void doVerb(Message message)
+    {
+        if (logger_.isDebugEnabled())
+          logger_.debug("Received a BootstrapMetadataMessage from " + message.getFrom());
+        byte[] body = message.getMessageBody();
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(body, body.length);
+        try
+        {
+            BootstrapMetadataMessage bsMetadataMessage = BootstrapMetadataMessage.serializer().deserialize(bufIn);
+            BootstrapMetadata[] bsMetadata = bsMetadataMessage.bsMetadata_;
+            
+            /*
+             * This is for debugging purposes. Remove later.
+            */
+            for ( BootstrapMetadata bsmd : bsMetadata )
+            {
+                if (logger_.isDebugEnabled())
+                  logger_.debug(bsmd.toString());                                      
+            }
+            
+            for ( BootstrapMetadata bsmd : bsMetadata )
+            {
+                long startTime = System.currentTimeMillis();
+                doTransfer(bsmd.target_, bsmd.ranges_);     
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Time taken to boostrap " + 
+                        bsmd.target_ + 
+                        " is " + 
+                        (System.currentTimeMillis() - startTime) +
+                        " msecs.");
+            }
+        }
+        catch ( IOException ex )
+        {
+            logger_.info(LogUtil.throwableToString(ex));
+        }
+    }
+    
+    /*
+     * This method needs to figure out the files on disk
+     * locally for each range and then stream them using
+     * the Bootstrap protocol to the target endpoint.
+    */
+    private void doTransfer(EndPoint target, List<Range> ranges) throws IOException
+    {
+        if ( ranges.size() == 0 )
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug("No ranges to give scram ...");
+            return;
+        }
+        
+        /* Just for debugging process - remove later */            
+        for ( Range range : ranges )
+        {
+            StringBuilder sb = new StringBuilder("");                
+            sb.append(range.toString());
+            sb.append(" ");            
+            if (logger_.isDebugEnabled())
+              logger_.debug("Beginning transfer process to " + target + " for ranges " + sb.toString());                
+        }
+      
+        /*
+         * (1) First we dump all the memtables to disk.
+         * (2) Run a version of compaction which will basically
+         *     put the keys in the range specified into a directory
+         *     named as per the endpoint it is destined for inside the
+         *     bootstrap directory.
+         * (3) Handoff the data.
+        */
+        List<String> tables = DatabaseDescriptor.getTables();
+        for ( String tName : tables )
+        {
+            Table table = Table.open(tName);
+            if (logger_.isDebugEnabled())
+              logger_.debug("Flushing memtables ...");
+            table.flush(false);
+            if (logger_.isDebugEnabled())
+              logger_.debug("Forcing compaction ...");
+            /* Get the counting bloom filter for each endpoint and the list of files that need to be streamed */
+            List<String> fileList = new ArrayList<String>();
+            boolean bVal = table.forceCompaction(ranges, target, fileList);                
+            doHandoff(target, fileList, tName);
+        }
+    }
+
+    /**
+     * Stream the files in the bootstrap directory over to the
+     * node being bootstrapped.
+    */
+    private void doHandoff(EndPoint target, List<String> fileList, String table) throws IOException
+    {
+        List<File> filesList = new ArrayList<File>();
+        for(String file : fileList)
+        {
+            filesList.add(new File(file));
+        }
+        File[] files = filesList.toArray(new File[0]);
+        StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[files.length];
+        int i = 0;
+        for ( File file : files )
+        {
+            streamContexts[i] = new StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), table);
+            if (logger_.isDebugEnabled())
+              logger_.debug("Stream context metadata " + streamContexts[i]);
+            ++i;
+        }
+        
+        if ( files.length > 0 )
+        {
+            /* Set up the stream manager with the files that need to streamed */
+            StreamManager.instance(target).addFilesToStream(streamContexts);
+            /* Send the bootstrap initiate message */
+            BootstrapInitiateMessage biMessage = new BootstrapInitiateMessage(streamContexts);
+            Message message = BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
+            if (logger_.isDebugEnabled())
+              logger_.debug("Sending a bootstrap initiate message to " + target + " ...");
+            MessagingService.getMessagingInstance().sendOneWay(message, target);                
+            if (logger_.isDebugEnabled())
+              logger_.debug("Waiting for transfer to " + target + " to complete");
+            StreamManager.instance(target).waitForStreamCompletion();
+            if (logger_.isDebugEnabled())
+              logger_.debug("Done with transfer to " + target);  
+        }
+    }
+}
+

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapSourceTarget.java Thu Jul 30 15:30:21 2009
@@ -1,49 +1,49 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import org.apache.cassandra.net.EndPoint;
-
-/**
- * This class encapsulates who is the source and the
- * target of a bootstrap for a particular range.
- */
-class BootstrapSourceTarget
-{
-    protected EndPoint source_;
-    protected EndPoint target_;
-    
-    BootstrapSourceTarget(EndPoint source, EndPoint target)
-    {
-        source_ = source;
-        target_ = target;
-    }
-    
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder("");
-        sb.append("SOURCE: ");
-        sb.append(source_);
-        sb.append(" ----> ");
-        sb.append("TARGET: ");
-        sb.append(target_);
-        sb.append(" ");
-        return sb.toString();
-    }
-}
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * This class encapsulates who is the source and the
+ * target of a bootstrap for a particular range.
+ */
+class BootstrapSourceTarget
+{
+    protected EndPoint source_;
+    protected EndPoint target_;
+    
+    BootstrapSourceTarget(EndPoint source, EndPoint target)
+    {
+        source_ = source;
+        target_ = target;
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("");
+        sb.append("SOURCE: ");
+        sb.append(source_);
+        sb.append(" ----> ");
+        sb.append("TARGET: ");
+        sb.append(target_);
+        sb.append(" ");
+        return sb.toString();
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java Thu Jul 30 15:30:21 2009
@@ -1,48 +1,48 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
-import java.util.Comparator;
-
-public interface IPartitioner
-{
-    /**
-     * transform key to on-disk format s.t. keys are stored in node comparison order.
-     * this lets bootstrap rip out parts of the sstable sequentially instead of doing random seeks.
-     *
-     * @param key the raw, client-facing key
-     * @return decorated on-disk version of key
-     */
-    public String decorateKey(String key);
-
-    public String undecorateKey(String decoratedKey);
-
-    public Comparator<String> getDecoratedKeyComparator();
-
-    public Comparator<String> getReverseDecoratedKeyComparator();
-
-    /**
-     * @return the token to use for this node if none was saved
-     */
-    public Token getInitialToken(String key);
-
-    public Token getDefaultToken();
-
-    public Token.TokenFactory getTokenFactory();
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.util.Comparator;
+
+public interface IPartitioner
+{
+    /**
+     * transform key to on-disk format s.t. keys are stored in node comparison order.
+     * this lets bootstrap rip out parts of the sstable sequentially instead of doing random seeks.
+     *
+     * @param key the raw, client-facing key
+     * @return decorated on-disk version of key
+     */
+    public String decorateKey(String key);
+
+    public String undecorateKey(String decoratedKey);
+
+    public Comparator<String> getDecoratedKeyComparator();
+
+    public Comparator<String> getReverseDecoratedKeyComparator();
+
+    /**
+     * @return the token to use for this node if none was saved
+     */
+    public Token getInitialToken(String key);
+
+    public Token getDefaultToken();
+
+    public Token.TokenFactory getTokenFactory();
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Thu Jul 30 15:30:21 2009
@@ -1,229 +1,229 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.dht;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
-
- import org.apache.log4j.Logger;
-
- import org.apache.cassandra.net.EndPoint;
- import org.apache.cassandra.net.Message;
- import org.apache.cassandra.net.MessagingService;
-
-
-class LeaveJoinProtocolHelper
-{
-    private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolHelper.class);
-    
-    /**
-     * Give a range a-------------b which is being split as
-     * a-----x-----y-----b then we want a mapping from 
-     * (a, b] --> (a, x], (x, y], (y, b] 
-    */
-    protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, Token[] allTokens)
-    {
-        Map<Range, List<Range>> splitRanges = new HashMap<Range, List<Range>>();
-        Token[] tokens = new Token[allTokens.length];
-        System.arraycopy(allTokens, 0, tokens, 0, tokens.length);
-        Arrays.sort(tokens);
-        
-        Range prevRange = null;
-        Token prevToken = null;
-        boolean bVal = false;
-        
-        for ( Range oldRange : oldRanges )
-        {
-            if (bVal)
-            {
-                bVal = false; 
-                List<Range> subRanges = splitRanges.get(prevRange);
-                if ( subRanges != null )
-                    subRanges.add( new Range(prevToken, prevRange.right()) );     
-            }
-            
-            prevRange = oldRange;
-            prevToken = oldRange.left();                
-            for (Token token : tokens)
-            {     
-                List<Range> subRanges = splitRanges.get(oldRange);
-                if ( oldRange.contains(token) )
-                {                        
-                    if ( subRanges == null )
-                    {
-                        subRanges = new ArrayList<Range>();
-                        splitRanges.put(oldRange, subRanges);
-                    }                            
-                    subRanges.add( new Range(prevToken, token) );
-                    prevToken = token;
-                    bVal = true;
-                }
-                else
-                {
-                    if ( bVal )
-                    {
-                        bVal = false;                                                                                
-                        subRanges.add( new Range(prevToken, oldRange.right()) );                            
-                    }
-                }
-            }
-        }
-        /* This is to handle the last range being processed. */
-        if ( bVal )
-        {
-            bVal = false; 
-            List<Range> subRanges = splitRanges.get(prevRange);
-            subRanges.add( new Range(prevToken, prevRange.right()) );                            
-        }
-        return splitRanges;
-    }
-    
-    protected static Map<Range, List<BootstrapSourceTarget>> getRangeSourceTargetInfo(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Map<Range, List<EndPoint>> newRangeToEndPointMap)
-    {
-        Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = new HashMap<Range, List<BootstrapSourceTarget>>();
-        /*
-         * Basically calculate for each range the endpoints handling the
-         * range in the old token set and in the new token set. Whoever
-         * gets bumped out of the top N will have to hand off that range
-         * to the new dude.
-        */
-        Set<Range> oldRangeSet = oldRangeToEndPointMap.keySet();
-        for(Range range : oldRangeSet)
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Attempting to figure out the dudes who are bumped out for " + range + " ...");
-            List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
-            List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
-            if ( newEndPoints != null )
-            {                        
-                List<EndPoint> newEndPoints2 = new ArrayList<EndPoint>(newEndPoints);
-                for ( EndPoint newEndPoint : newEndPoints2 )
-                {
-                    if ( oldEndPoints.contains(newEndPoint) )
-                    {
-                        oldEndPoints.remove(newEndPoint);
-                        newEndPoints.remove(newEndPoint);
-                    }
-                }                        
-            }
-            else
-            {
-                logger_.warn("Trespassing - scram");
-            }
-            if (logger_.isDebugEnabled())
-              logger_.debug("Done figuring out the dudes who are bumped out for range " + range + " ...");
-        }
-        for ( Range range : oldRangeSet )
-        {                    
-            List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
-            List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
-            List<BootstrapSourceTarget> srcTarget = rangesWithSourceTarget.get(range);
-            if ( srcTarget == null )
-            {
-                srcTarget = new ArrayList<BootstrapSourceTarget>();
-                rangesWithSourceTarget.put(range, srcTarget);
-            }
-            int i = 0;
-            for ( EndPoint oldEndPoint : oldEndPoints )
-            {                        
-                srcTarget.add( new BootstrapSourceTarget(oldEndPoint, newEndPoints.get(i++)) );
-            }
-        }
-        return rangesWithSourceTarget;
-    }
-    
-    /**
-     * This method sends messages out to nodes instructing them 
-     * to stream the specified ranges to specified target nodes. 
-    */
-    protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget) throws IOException
-    {
-        assignWork(rangesWithSourceTarget, null);
-    }
-    
-    /**
-     * This method sends messages out to nodes instructing them 
-     * to stream the specified ranges to specified target nodes. 
-    */
-    protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget, List<EndPoint> filters) throws IOException
-    {
-        /*
-         * Map whose key is the source node and the value is a map whose key is the
-         * target and value is the list of ranges to be sent to it. 
-        */
-        Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = new HashMap<EndPoint, Map<EndPoint, List<Range>>>();
-        Set<Range> ranges = rangesWithSourceTarget.keySet();
-        
-        for ( Range range : ranges )
-        {
-            List<BootstrapSourceTarget> rangeSourceTargets = rangesWithSourceTarget.get(range);
-            for ( BootstrapSourceTarget rangeSourceTarget : rangeSourceTargets )
-            {
-                Map<EndPoint, List<Range>> targetRangeMap = rangeInfo.get(rangeSourceTarget.source_);
-                if ( targetRangeMap == null )
-                {
-                    targetRangeMap = new HashMap<EndPoint, List<Range>>();
-                    rangeInfo.put(rangeSourceTarget.source_, targetRangeMap);
-                }
-                List<Range> rangesToGive = targetRangeMap.get(rangeSourceTarget.target_);
-                if ( rangesToGive == null )
-                {
-                    rangesToGive = new ArrayList<Range>();
-                    targetRangeMap.put(rangeSourceTarget.target_, rangesToGive);
-                }
-                rangesToGive.add(range);
-            }
-        }
-        
-        Set<EndPoint> sources = rangeInfo.keySet();
-        for ( EndPoint source : sources )
-        {
-            /* only send the message to the nodes that are in the filter. */
-            if ( filters != null && filters.size() > 0 && !filters.contains(source) )
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Filtering endpoint " + source + " as source ...");
-                continue;
-            }
-            
-            Map<EndPoint, List<Range>> targetRangesMap = rangeInfo.get(source);
-            Set<EndPoint> targets = targetRangesMap.keySet();
-            List<BootstrapMetadata> bsmdList = new ArrayList<BootstrapMetadata>();
-            
-            for ( EndPoint target : targets )
-            {
-                List<Range> rangeForTarget = targetRangesMap.get(target);
-                BootstrapMetadata bsMetadata = new BootstrapMetadata(target, rangeForTarget);
-                bsmdList.add(bsMetadata);
-            }
-            
-            BootstrapMetadataMessage bsMetadataMessage = new BootstrapMetadataMessage(bsmdList.toArray( new BootstrapMetadata[0] ) );
-            /* Send this message to the source to do his shit. */
-            Message message = BootstrapMetadataMessage.makeBootstrapMetadataMessage(bsMetadataMessage); 
-            if (logger_.isDebugEnabled())
-              logger_.debug("Sending the BootstrapMetadataMessage to " + source);
-            MessagingService.getMessagingInstance().sendOneWay(message, source);
-        }
-    }
-}
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.log4j.Logger;
+
+ import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.net.Message;
+ import org.apache.cassandra.net.MessagingService;
+
+
+class LeaveJoinProtocolHelper
+{
+    private static Logger logger_ = Logger.getLogger(LeaveJoinProtocolHelper.class);
+    
+    /**
+     * Give a range a-------------b which is being split as
+     * a-----x-----y-----b then we want a mapping from 
+     * (a, b] --> (a, x], (x, y], (y, b] 
+    */
+    protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, Token[] allTokens)
+    {
+        Map<Range, List<Range>> splitRanges = new HashMap<Range, List<Range>>();
+        Token[] tokens = new Token[allTokens.length];
+        System.arraycopy(allTokens, 0, tokens, 0, tokens.length);
+        Arrays.sort(tokens);
+        
+        Range prevRange = null;
+        Token prevToken = null;
+        boolean bVal = false;
+        
+        for ( Range oldRange : oldRanges )
+        {
+            if (bVal)
+            {
+                bVal = false; 
+                List<Range> subRanges = splitRanges.get(prevRange);
+                if ( subRanges != null )
+                    subRanges.add( new Range(prevToken, prevRange.right()) );     
+            }
+            
+            prevRange = oldRange;
+            prevToken = oldRange.left();                
+            for (Token token : tokens)
+            {     
+                List<Range> subRanges = splitRanges.get(oldRange);
+                if ( oldRange.contains(token) )
+                {                        
+                    if ( subRanges == null )
+                    {
+                        subRanges = new ArrayList<Range>();
+                        splitRanges.put(oldRange, subRanges);
+                    }                            
+                    subRanges.add( new Range(prevToken, token) );
+                    prevToken = token;
+                    bVal = true;
+                }
+                else
+                {
+                    if ( bVal )
+                    {
+                        bVal = false;                                                                                
+                        subRanges.add( new Range(prevToken, oldRange.right()) );                            
+                    }
+                }
+            }
+        }
+        /* This is to handle the last range being processed. */
+        if ( bVal )
+        {
+            bVal = false; 
+            List<Range> subRanges = splitRanges.get(prevRange);
+            subRanges.add( new Range(prevToken, prevRange.right()) );                            
+        }
+        return splitRanges;
+    }
+    
+    protected static Map<Range, List<BootstrapSourceTarget>> getRangeSourceTargetInfo(Map<Range, List<EndPoint>> oldRangeToEndPointMap, Map<Range, List<EndPoint>> newRangeToEndPointMap)
+    {
+        Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = new HashMap<Range, List<BootstrapSourceTarget>>();
+        /*
+         * Basically calculate for each range the endpoints handling the
+         * range in the old token set and in the new token set. Whoever
+         * gets bumped out of the top N will have to hand off that range
+         * to the new dude.
+        */
+        Set<Range> oldRangeSet = oldRangeToEndPointMap.keySet();
+        for(Range range : oldRangeSet)
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug("Attempting to figure out the dudes who are bumped out for " + range + " ...");
+            List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
+            List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+            if ( newEndPoints != null )
+            {                        
+                List<EndPoint> newEndPoints2 = new ArrayList<EndPoint>(newEndPoints);
+                for ( EndPoint newEndPoint : newEndPoints2 )
+                {
+                    if ( oldEndPoints.contains(newEndPoint) )
+                    {
+                        oldEndPoints.remove(newEndPoint);
+                        newEndPoints.remove(newEndPoint);
+                    }
+                }                        
+            }
+            else
+            {
+                logger_.warn("Trespassing - scram");
+            }
+            if (logger_.isDebugEnabled())
+              logger_.debug("Done figuring out the dudes who are bumped out for range " + range + " ...");
+        }
+        for ( Range range : oldRangeSet )
+        {                    
+            List<EndPoint> oldEndPoints = oldRangeToEndPointMap.get(range);
+            List<EndPoint> newEndPoints = newRangeToEndPointMap.get(range);
+            List<BootstrapSourceTarget> srcTarget = rangesWithSourceTarget.get(range);
+            if ( srcTarget == null )
+            {
+                srcTarget = new ArrayList<BootstrapSourceTarget>();
+                rangesWithSourceTarget.put(range, srcTarget);
+            }
+            int i = 0;
+            for ( EndPoint oldEndPoint : oldEndPoints )
+            {                        
+                srcTarget.add( new BootstrapSourceTarget(oldEndPoint, newEndPoints.get(i++)) );
+            }
+        }
+        return rangesWithSourceTarget;
+    }
+    
+    /**
+     * This method sends messages out to nodes instructing them 
+     * to stream the specified ranges to specified target nodes. 
+    */
+    protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget) throws IOException
+    {
+        assignWork(rangesWithSourceTarget, null);
+    }
+    
+    /**
+     * This method sends messages out to nodes instructing them 
+     * to stream the specified ranges to specified target nodes. 
+    */
+    protected static void assignWork(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget, List<EndPoint> filters) throws IOException
+    {
+        /*
+         * Map whose key is the source node and the value is a map whose key is the
+         * target and value is the list of ranges to be sent to it. 
+        */
+        Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = new HashMap<EndPoint, Map<EndPoint, List<Range>>>();
+        Set<Range> ranges = rangesWithSourceTarget.keySet();
+        
+        for ( Range range : ranges )
+        {
+            List<BootstrapSourceTarget> rangeSourceTargets = rangesWithSourceTarget.get(range);
+            for ( BootstrapSourceTarget rangeSourceTarget : rangeSourceTargets )
+            {
+                Map<EndPoint, List<Range>> targetRangeMap = rangeInfo.get(rangeSourceTarget.source_);
+                if ( targetRangeMap == null )
+                {
+                    targetRangeMap = new HashMap<EndPoint, List<Range>>();
+                    rangeInfo.put(rangeSourceTarget.source_, targetRangeMap);
+                }
+                List<Range> rangesToGive = targetRangeMap.get(rangeSourceTarget.target_);
+                if ( rangesToGive == null )
+                {
+                    rangesToGive = new ArrayList<Range>();
+                    targetRangeMap.put(rangeSourceTarget.target_, rangesToGive);
+                }
+                rangesToGive.add(range);
+            }
+        }
+        
+        Set<EndPoint> sources = rangeInfo.keySet();
+        for ( EndPoint source : sources )
+        {
+            /* only send the message to the nodes that are in the filter. */
+            if ( filters != null && filters.size() > 0 && !filters.contains(source) )
+            {
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Filtering endpoint " + source + " as source ...");
+                continue;
+            }
+            
+            Map<EndPoint, List<Range>> targetRangesMap = rangeInfo.get(source);
+            Set<EndPoint> targets = targetRangesMap.keySet();
+            List<BootstrapMetadata> bsmdList = new ArrayList<BootstrapMetadata>();
+            
+            for ( EndPoint target : targets )
+            {
+                List<Range> rangeForTarget = targetRangesMap.get(target);
+                BootstrapMetadata bsMetadata = new BootstrapMetadata(target, rangeForTarget);
+                bsmdList.add(bsMetadata);
+            }
+            
+            BootstrapMetadataMessage bsMetadataMessage = new BootstrapMetadataMessage(bsmdList.toArray( new BootstrapMetadata[0] ) );
+            /* Send this message to the source to do his shit. */
+            Message message = BootstrapMetadataMessage.makeBootstrapMetadataMessage(bsMetadataMessage); 
+            if (logger_.isDebugEnabled())
+              logger_.debug("Sending the BootstrapMetadataMessage to " + source);
+            MessagingService.getMessagingInstance().sendOneWay(message, source);
+        }
+    }
+}



Mime
View raw message