cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r799331 [23/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Date Thu, 30 Jul 2009 15:30:27 GMT
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java Thu Jul 30 15:30:21 2009
@@ -1,328 +1,328 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class StreamContextManager
-{
-    private static Logger logger_ = Logger.getLogger(StreamContextManager.class);
-    
-    public static enum StreamCompletionAction
-    {
-        DELETE,
-        STREAM
-    }
-    
-    public static class StreamContext implements Serializable
-    {
-        private static Logger logger_ = Logger.getLogger(StreamContextManager.StreamContext.class);
-        private static ICompactSerializer<StreamContext> serializer_;
-        
-        static
-        {
-            serializer_ = new StreamContextSerializer();
-        }
-        
-        public static ICompactSerializer<StreamContext> serializer()
-        {
-            return serializer_;
-        }
-                
-        private String targetFile_;        
-        private long expectedBytes_;                     
-        private String table_;
-        
-        public StreamContext(String targetFile, long expectedBytes, String table)
-        {
-            targetFile_ = targetFile;
-            expectedBytes_ = expectedBytes;         
-            table_ = table;
-        }
-
-        public String getTable()
-        {
-            return table_;
-        }                
-                
-        public String getTargetFile()
-        {
-            return targetFile_;
-        }
-        
-        public void setTargetFile(String file)
-        {
-            targetFile_ = file;
-        }
-        
-        public long getExpectedBytes()
-        {
-            return expectedBytes_;
-        }
-                
-        public boolean equals(Object o)
-        {
-            if ( !(o instanceof StreamContext) )
-                return false;
-            
-            StreamContext rhs = (StreamContext)o;
-            return targetFile_.equals(rhs.targetFile_);
-        }
-        
-        public int hashCode()
-        {
-            return toString().hashCode();
-        }
-        
-        public String toString()
-        {
-            return targetFile_ + ":" + expectedBytes_;
-        }
-    }
-    
-    public static class StreamContextSerializer implements ICompactSerializer<StreamContext>
-    {
-        public void serialize(StreamContextManager.StreamContext sc, DataOutputStream dos) throws IOException
-        {
-            dos.writeUTF(sc.targetFile_);
-            dos.writeLong(sc.expectedBytes_);            
-            dos.writeUTF(sc.table_);
-        }
-        
-        public StreamContextManager.StreamContext deserialize(DataInputStream dis) throws IOException
-        {
-            String targetFile = dis.readUTF();
-            long expectedBytes = dis.readLong();           
-            String table = dis.readUTF();
-            return new StreamContext(targetFile, expectedBytes, table);
-        }
-    }
-    
-    public static class StreamStatus
-    {
-        private static ICompactSerializer<StreamStatus> serializer_;
-        
-        static 
-        {
-            serializer_ = new StreamStatusSerializer();
-        }
-        
-        public static ICompactSerializer<StreamStatus> serializer()
-        {
-            return serializer_;
-        }
-            
-        private String file_;               
-        private long expectedBytes_;                
-        private StreamCompletionAction action_;
-                
-        public StreamStatus(String file, long expectedBytes)
-        {
-            file_ = file;
-            expectedBytes_ = expectedBytes;
-            action_ = StreamContextManager.StreamCompletionAction.DELETE;
-        }
-        
-        public String getFile()
-        {
-            return file_;
-        }
-        
-        public long getExpectedBytes()
-        {
-            return expectedBytes_;
-        }
-        
-        void setAction(StreamContextManager.StreamCompletionAction action)
-        {
-            action_ = action;
-        }
-        
-        public StreamContextManager.StreamCompletionAction getAction()
-        {
-            return action_;
-        }
-    }
-    
-    public static class StreamStatusSerializer implements ICompactSerializer<StreamStatus>
-    {
-        public void serialize(StreamStatus streamStatus, DataOutputStream dos) throws IOException
-        {
-            dos.writeUTF(streamStatus.getFile());
-            dos.writeLong(streamStatus.getExpectedBytes());
-            dos.writeInt(streamStatus.getAction().ordinal());
-        }
-        
-        public StreamStatus deserialize(DataInputStream dis) throws IOException
-        {
-            String targetFile = dis.readUTF();
-            long expectedBytes = dis.readLong();
-            StreamStatus streamStatus = new StreamStatus(targetFile, expectedBytes);
-            
-            int ordinal = dis.readInt();                        
-            if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
-            {
-                streamStatus.setAction(StreamCompletionAction.DELETE);
-            }
-            else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
-            {
-                streamStatus.setAction(StreamCompletionAction.STREAM);
-            }
-            
-            return streamStatus;
-        }
-    }
-    
-    public static class StreamStatusMessage implements Serializable
-    {
-        private static ICompactSerializer<StreamStatusMessage> serializer_;
-        
-        static 
-        {
-            serializer_ = new StreamStatusMessageSerializer();
-        }
-        
-        public static ICompactSerializer<StreamStatusMessage> serializer()
-        {
-            return serializer_;
-        }
-        
-        public static Message makeStreamStatusMessage(StreamStatusMessage streamStatusMessage) throws IOException
-        {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            DataOutputStream dos = new DataOutputStream( bos );
-            StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
-            return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, bos.toByteArray());
-        }
-        
-        protected StreamContextManager.StreamStatus streamStatus_;
-        
-        public StreamStatusMessage(StreamContextManager.StreamStatus streamStatus)
-        {
-            streamStatus_ = streamStatus;
-        }
-        
-        public StreamContextManager.StreamStatus getStreamStatus()
-        {
-            return streamStatus_;
-        }
-    }
-    
-    public static class StreamStatusMessageSerializer implements ICompactSerializer<StreamStatusMessage>
-    {
-        public void serialize(StreamStatusMessage streamStatusMessage, DataOutputStream dos) throws IOException
-        {
-            StreamStatus.serializer().serialize(streamStatusMessage.streamStatus_, dos);            
-        }
-        
-        public StreamStatusMessage deserialize(DataInputStream dis) throws IOException
-        {            
-            StreamContextManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);         
-            return new StreamStatusMessage(streamStatus);
-        }
-    }
-        
-    /* Maintain a stream context per host that is the source of the stream */
-    public static Map<String, List<StreamContext>> ctxBag_ = new Hashtable<String, List<StreamContext>>();  
-    /* Maintain in this map the status of the streams that need to be sent back to the source */
-    public static Map<String, List<StreamStatus>> streamStatusBag_ = new Hashtable<String, List<StreamStatus>>();
-    /* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
-    public static Map<String, IStreamComplete> streamNotificationHandlers_ = new HashMap<String, IStreamComplete>();
-    
-    public synchronized static StreamContext getStreamContext(String key)
-    {        
-        List<StreamContext> context = ctxBag_.get(key);
-        if ( context == null )
-            throw new IllegalStateException("Streaming context has not been set.");
-        StreamContext streamContext = context.remove(0);        
-        if ( context.isEmpty() )
-            ctxBag_.remove(key);
-        return streamContext;
-    }
-    
-    public synchronized static StreamStatus getStreamStatus(String key)
-    {
-        List<StreamStatus> status = streamStatusBag_.get(key);
-        if ( status == null )
-            throw new IllegalStateException("Streaming status has not been set.");
-        StreamStatus streamStatus = status.remove(0);        
-        if ( status.isEmpty() )
-            streamStatusBag_.remove(key);
-        return streamStatus;
-    }
-    
-    /*
-     * This method helps determine if the StreamCompletionHandler needs
-     * to be invoked for the data being streamed from a source. 
-    */
-    public synchronized static boolean isDone(String key)
-    {
-        return (ctxBag_.get(key) == null);
-    }
-    
-    public synchronized static IStreamComplete getStreamCompletionHandler(String key)
-    {
-        return streamNotificationHandlers_.get(key);
-    }
-    
-    public synchronized static void removeStreamCompletionHandler(String key)
-    {
-        streamNotificationHandlers_.remove(key);
-    }
-    
-    public synchronized static void registerStreamCompletionHandler(String key, IStreamComplete streamComplete)
-    {
-        streamNotificationHandlers_.put(key, streamComplete);
-    }
-    
-    public synchronized static void addStreamContext(String key, StreamContext streamContext, StreamStatus streamStatus)
-    {
-        /* Record the stream context */
-        List<StreamContext> context = ctxBag_.get(key);        
-        if ( context == null )
-        {
-            context = new ArrayList<StreamContext>();
-            ctxBag_.put(key, context);
-        }
-        context.add(streamContext);
-        
-        /* Record the stream status for this stream context */
-        List<StreamStatus> status = streamStatusBag_.get(key);
-        if ( status == null )
-        {
-            status = new ArrayList<StreamStatus>();
-            streamStatusBag_.put(key, status);
-        }
-        status.add( streamStatus );
-    }        
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class StreamContextManager
+{
+    private static Logger logger_ = Logger.getLogger(StreamContextManager.class);
+    
+    public static enum StreamCompletionAction
+    {
+        DELETE,
+        STREAM
+    }
+    
+    public static class StreamContext implements Serializable
+    {
+        private static Logger logger_ = Logger.getLogger(StreamContextManager.StreamContext.class);
+        private static ICompactSerializer<StreamContext> serializer_;
+        
+        static
+        {
+            serializer_ = new StreamContextSerializer();
+        }
+        
+        public static ICompactSerializer<StreamContext> serializer()
+        {
+            return serializer_;
+        }
+                
+        private String targetFile_;        
+        private long expectedBytes_;                     
+        private String table_;
+        
+        public StreamContext(String targetFile, long expectedBytes, String table)
+        {
+            targetFile_ = targetFile;
+            expectedBytes_ = expectedBytes;         
+            table_ = table;
+        }
+
+        public String getTable()
+        {
+            return table_;
+        }                
+                
+        public String getTargetFile()
+        {
+            return targetFile_;
+        }
+        
+        public void setTargetFile(String file)
+        {
+            targetFile_ = file;
+        }
+        
+        public long getExpectedBytes()
+        {
+            return expectedBytes_;
+        }
+                
+        public boolean equals(Object o)
+        {
+            if ( !(o instanceof StreamContext) )
+                return false;
+            
+            StreamContext rhs = (StreamContext)o;
+            return targetFile_.equals(rhs.targetFile_);
+        }
+        
+        public int hashCode()
+        {
+            return toString().hashCode();
+        }
+        
+        public String toString()
+        {
+            return targetFile_ + ":" + expectedBytes_;
+        }
+    }
+    
+    public static class StreamContextSerializer implements ICompactSerializer<StreamContext>
+    {
+        public void serialize(StreamContextManager.StreamContext sc, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(sc.targetFile_);
+            dos.writeLong(sc.expectedBytes_);            
+            dos.writeUTF(sc.table_);
+        }
+        
+        public StreamContextManager.StreamContext deserialize(DataInputStream dis) throws IOException
+        {
+            String targetFile = dis.readUTF();
+            long expectedBytes = dis.readLong();           
+            String table = dis.readUTF();
+            return new StreamContext(targetFile, expectedBytes, table);
+        }
+    }
+    
+    public static class StreamStatus
+    {
+        private static ICompactSerializer<StreamStatus> serializer_;
+        
+        static 
+        {
+            serializer_ = new StreamStatusSerializer();
+        }
+        
+        public static ICompactSerializer<StreamStatus> serializer()
+        {
+            return serializer_;
+        }
+            
+        private String file_;               
+        private long expectedBytes_;                
+        private StreamCompletionAction action_;
+                
+        public StreamStatus(String file, long expectedBytes)
+        {
+            file_ = file;
+            expectedBytes_ = expectedBytes;
+            action_ = StreamContextManager.StreamCompletionAction.DELETE;
+        }
+        
+        public String getFile()
+        {
+            return file_;
+        }
+        
+        public long getExpectedBytes()
+        {
+            return expectedBytes_;
+        }
+        
+        void setAction(StreamContextManager.StreamCompletionAction action)
+        {
+            action_ = action;
+        }
+        
+        public StreamContextManager.StreamCompletionAction getAction()
+        {
+            return action_;
+        }
+    }
+    
+    public static class StreamStatusSerializer implements ICompactSerializer<StreamStatus>
+    {
+        public void serialize(StreamStatus streamStatus, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(streamStatus.getFile());
+            dos.writeLong(streamStatus.getExpectedBytes());
+            dos.writeInt(streamStatus.getAction().ordinal());
+        }
+        
+        public StreamStatus deserialize(DataInputStream dis) throws IOException
+        {
+            String targetFile = dis.readUTF();
+            long expectedBytes = dis.readLong();
+            StreamStatus streamStatus = new StreamStatus(targetFile, expectedBytes);
+            
+            int ordinal = dis.readInt();                        
+            if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.DELETE);
+            }
+            else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.STREAM);
+            }
+            
+            return streamStatus;
+        }
+    }
+    
+    public static class StreamStatusMessage implements Serializable
+    {
+        private static ICompactSerializer<StreamStatusMessage> serializer_;
+        
+        static 
+        {
+            serializer_ = new StreamStatusMessageSerializer();
+        }
+        
+        public static ICompactSerializer<StreamStatusMessage> serializer()
+        {
+            return serializer_;
+        }
+        
+        public static Message makeStreamStatusMessage(StreamStatusMessage streamStatusMessage) throws IOException
+        {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream( bos );
+            StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
+            return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, bos.toByteArray());
+        }
+        
+        protected StreamContextManager.StreamStatus streamStatus_;
+        
+        public StreamStatusMessage(StreamContextManager.StreamStatus streamStatus)
+        {
+            streamStatus_ = streamStatus;
+        }
+        
+        public StreamContextManager.StreamStatus getStreamStatus()
+        {
+            return streamStatus_;
+        }
+    }
+    
+    public static class StreamStatusMessageSerializer implements ICompactSerializer<StreamStatusMessage>
+    {
+        public void serialize(StreamStatusMessage streamStatusMessage, DataOutputStream dos) throws IOException
+        {
+            StreamStatus.serializer().serialize(streamStatusMessage.streamStatus_, dos);            
+        }
+        
+        public StreamStatusMessage deserialize(DataInputStream dis) throws IOException
+        {            
+            StreamContextManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);         
+            return new StreamStatusMessage(streamStatus);
+        }
+    }
+        
+    /* Maintain a stream context per host that is the source of the stream */
+    public static Map<String, List<StreamContext>> ctxBag_ = new Hashtable<String, List<StreamContext>>();  
+    /* Maintain in this map the status of the streams that need to be sent back to the source */
+    public static Map<String, List<StreamStatus>> streamStatusBag_ = new Hashtable<String, List<StreamStatus>>();
+    /* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
+    public static Map<String, IStreamComplete> streamNotificationHandlers_ = new HashMap<String, IStreamComplete>();
+    
+    public synchronized static StreamContext getStreamContext(String key)
+    {        
+        List<StreamContext> context = ctxBag_.get(key);
+        if ( context == null )
+            throw new IllegalStateException("Streaming context has not been set.");
+        StreamContext streamContext = context.remove(0);        
+        if ( context.isEmpty() )
+            ctxBag_.remove(key);
+        return streamContext;
+    }
+    
+    public synchronized static StreamStatus getStreamStatus(String key)
+    {
+        List<StreamStatus> status = streamStatusBag_.get(key);
+        if ( status == null )
+            throw new IllegalStateException("Streaming status has not been set.");
+        StreamStatus streamStatus = status.remove(0);        
+        if ( status.isEmpty() )
+            streamStatusBag_.remove(key);
+        return streamStatus;
+    }
+    
+    /*
+     * This method helps determine if the StreamCompletionHandler needs
+     * to be invoked for the data being streamed from a source. 
+    */
+    public synchronized static boolean isDone(String key)
+    {
+        return (ctxBag_.get(key) == null);
+    }
+    
+    public synchronized static IStreamComplete getStreamCompletionHandler(String key)
+    {
+        return streamNotificationHandlers_.get(key);
+    }
+    
+    public synchronized static void removeStreamCompletionHandler(String key)
+    {
+        streamNotificationHandlers_.remove(key);
+    }
+    
+    public synchronized static void registerStreamCompletionHandler(String key, IStreamComplete streamComplete)
+    {
+        streamNotificationHandlers_.put(key, streamComplete);
+    }
+    
+    public synchronized static void addStreamContext(String key, StreamContext streamContext, StreamStatus streamStatus)
+    {
+        /* Record the stream context */
+        List<StreamContext> context = ctxBag_.get(key);        
+        if ( context == null )
+        {
+            context = new ArrayList<StreamContext>();
+            ctxBag_.put(key, context);
+        }
+        context.add(streamContext);
+        
+        /* Record the stream status for this stream context */
+        List<StreamStatus> status = streamStatusBag_.get(key);
+        if ( status == null )
+        {
+            status = new ArrayList<StreamStatus>();
+            streamStatusBag_.put(key, status);
+        }
+        status.add( streamStatus );
+    }        
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/TcpReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/TcpReader.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/TcpReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/TcpReader.java Thu Jul 30 15:30:21 2009
@@ -1,122 +1,122 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-import java.util.*;
-
-import org.apache.cassandra.net.ProtocolHeader;
-import org.apache.cassandra.net.TcpConnection;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class TcpReader
-{
-    public static enum TcpReaderState
-    {
-        START,
-        PREAMBLE,
-        PROTOCOL,
-        CONTENT_LENGTH,
-        CONTENT,
-        CONTENT_STREAM,
-        DONE
-    }
-    
-    private Map<TcpReaderState, StartState> stateMap_ = new HashMap<TcpReaderState, StartState>();
-    private TcpConnection connection_;
-    private StartState socketState_;
-    private ProtocolHeader protocolHeader_;
-    
-    public TcpReader(TcpConnection connection)
-    {
-        connection_ = connection;        
-    }
-    
-    public StartState getSocketState(TcpReaderState state)
-    {
-        return stateMap_.get(state);
-    }
-    
-    public void putSocketState(TcpReaderState state, StartState socketState)
-    {
-        stateMap_.put(state, socketState);
-    } 
-    
-    public void resetState()
-    {
-        StartState nextState = stateMap_.get(TcpReaderState.PREAMBLE);
-        if ( nextState == null )
-        {
-            nextState = new ProtocolState(this);
-            stateMap_.put(TcpReaderState.PREAMBLE, nextState);
-        }
-        socketState_ = nextState;
-    }
-    
-    public void morphState(StartState state)
-    {        
-        socketState_ = state;
-        if ( protocolHeader_ == null )
-            protocolHeader_ = new ProtocolHeader();
-    }
-    
-    public ProtocolHeader getProtocolHeader()
-    {
-        return protocolHeader_;
-    }
-    
-    public SocketChannel getStream()
-    {
-        return connection_.getSocketChannel();
-    }
-    
-    public byte[] read() throws IOException
-    {
-        byte[] bytes = new byte[0];      
-        while ( socketState_ != null )
-        {
-            try
-            {                                                                      
-                bytes = socketState_.read();
-            }
-            catch ( ReadNotCompleteException e )
-            {                
-                break;
-            }
-        }
-        return bytes;
-    }    
-    
-    public static void main(String[] args) throws Throwable
-    {
-        Map<TcpReaderState, StartState> stateMap = new HashMap<TcpReaderState, StartState>();
-        stateMap.put(TcpReaderState.CONTENT, new ContentState(null, 10));
-        stateMap.put(TcpReaderState.START, new ProtocolState(null));
-        stateMap.put(TcpReaderState.CONTENT_LENGTH, new ContentLengthState(null));
-        
-        StartState state = stateMap.get(TcpReaderState.CONTENT);
-        System.out.println( state.getClass().getName() );
-        state = stateMap.get(TcpReaderState.CONTENT_LENGTH);
-        System.out.println( state.getClass().getName() );
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.*;
+
+import org.apache.cassandra.net.ProtocolHeader;
+import org.apache.cassandra.net.TcpConnection;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpReader
+{
+    public static enum TcpReaderState
+    {
+        START,
+        PREAMBLE,
+        PROTOCOL,
+        CONTENT_LENGTH,
+        CONTENT,
+        CONTENT_STREAM,
+        DONE
+    }
+    
+    private Map<TcpReaderState, StartState> stateMap_ = new HashMap<TcpReaderState, StartState>();
+    private TcpConnection connection_;
+    private StartState socketState_;
+    private ProtocolHeader protocolHeader_;
+    
+    public TcpReader(TcpConnection connection)
+    {
+        connection_ = connection;        
+    }
+    
+    public StartState getSocketState(TcpReaderState state)
+    {
+        return stateMap_.get(state);
+    }
+    
+    public void putSocketState(TcpReaderState state, StartState socketState)
+    {
+        stateMap_.put(state, socketState);
+    } 
+    
+    public void resetState()
+    {
+        StartState nextState = stateMap_.get(TcpReaderState.PREAMBLE);
+        if ( nextState == null )
+        {
+            nextState = new ProtocolState(this);
+            stateMap_.put(TcpReaderState.PREAMBLE, nextState);
+        }
+        socketState_ = nextState;
+    }
+    
+    public void morphState(StartState state)
+    {        
+        socketState_ = state;
+        if ( protocolHeader_ == null )
+            protocolHeader_ = new ProtocolHeader();
+    }
+    
+    public ProtocolHeader getProtocolHeader()
+    {
+        return protocolHeader_;
+    }
+    
+    public SocketChannel getStream()
+    {
+        return connection_.getSocketChannel();
+    }
+    
+    public byte[] read() throws IOException
+    {
+        byte[] bytes = new byte[0];      
+        while ( socketState_ != null )
+        {
+            try
+            {                                                                      
+                bytes = socketState_.read();
+            }
+            catch ( ReadNotCompleteException e )
+            {                
+                break;
+            }
+        }
+        return bytes;
+    }    
+    
+    public static void main(String[] args) throws Throwable
+    {
+        Map<TcpReaderState, StartState> stateMap = new HashMap<TcpReaderState, StartState>();
+        stateMap.put(TcpReaderState.CONTENT, new ContentState(null, 10));
+        stateMap.put(TcpReaderState.START, new ProtocolState(null));
+        stateMap.put(TcpReaderState.CONTENT_LENGTH, new ContentLengthState(null));
+        
+        StartState state = stateMap.get(TcpReaderState.CONTENT);
+        System.out.println( state.getClass().getName() );
+        state = stateMap.get(TcpReaderState.CONTENT_LENGTH);
+        System.out.println( state.getClass().getName() );
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java Thu Jul 30 15:30:21 2009
@@ -1,30 +1,30 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.sink;
-
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface IMessageSink
-{
-    public Message handleMessage(Message message);    
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.sink;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IMessageSink
+{
+    public Message handleMessage(Message message);    
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java Thu Jul 30 15:30:21 2009
@@ -1,78 +1,78 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.sink;
-
-import java.util.*;
-import java.io.IOException;
-
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class SinkManager
-{
-    private static LinkedList<IMessageSink> messageSinks_ = new LinkedList<IMessageSink>();
-
-    public static boolean isInitialized()
-    {
-        return ( messageSinks_.size() > 0 );
-    }
-
-    public static void addMessageSink(IMessageSink ms)
-    {
-        messageSinks_.addLast(ms);
-    }
-    
-    public static void clearSinks(){
-        messageSinks_.clear();
-    }
-
-    public static Message processClientMessageSink(Message message)
-    {
-        ListIterator<IMessageSink> li = messageSinks_.listIterator();
-        while ( li.hasNext() )
-        {
-            IMessageSink ms = li.next();
-            message = ms.handleMessage(message);
-            if ( message == null )
-            {
-                return null;
-            }
-        }
-        return message;
-    }
-
-    public static Message processServerMessageSink(Message message)
-    {
-        ListIterator<IMessageSink> li = messageSinks_.listIterator(messageSinks_.size());
-        while ( li.hasPrevious() )
-        {
-            IMessageSink ms = li.previous();
-            message = ms.handleMessage(message);
-            if ( message == null )
-            {
-                return null;
-            }
-        }
-        return message;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.sink;
+
+import java.util.*;
+import java.io.IOException;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SinkManager
+{
+    private static LinkedList<IMessageSink> messageSinks_ = new LinkedList<IMessageSink>();
+
+    public static boolean isInitialized()
+    {
+        return ( messageSinks_.size() > 0 );
+    }
+
+    public static void addMessageSink(IMessageSink ms)
+    {
+        messageSinks_.addLast(ms);
+    }
+    
+    public static void clearSinks(){
+        messageSinks_.clear();
+    }
+
+    public static Message processClientMessageSink(Message message)
+    {
+        ListIterator<IMessageSink> li = messageSinks_.listIterator();
+        while ( li.hasNext() )
+        {
+            IMessageSink ms = li.next();
+            message = ms.handleMessage(message);
+            if ( message == null )
+            {
+                return null;
+            }
+        }
+        return message;
+    }
+
+    public static Message processServerMessageSink(Message message)
+    {
+        ListIterator<IMessageSink> li = messageSinks_.listIterator(messageSinks_.size());
+        while ( li.hasPrevious() )
+        {
+            IMessageSink ms = li.previous();
+            message = ms.handleMessage(message);
+            if ( message == null )
+            {
+                return null;
+            }
+        }
+        return message;
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/procedures/GroovyScriptRunner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/procedures/GroovyScriptRunner.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/procedures/GroovyScriptRunner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/procedures/GroovyScriptRunner.java Thu Jul 30 15:30:21 2009
@@ -16,16 +16,16 @@
 * specific language governing permissions and limitations
 * under the License.
 */
-package org.apache.cassandra.procedures;
-
-import groovy.lang.GroovyShell;
-
-public class GroovyScriptRunner
-{
-	private static GroovyShell groovyShell_ = new GroovyShell();
-
-	public static String evaluateString(String script)
-	{        
-		 return groovyShell_.evaluate(script).toString();
-	}
-}
+package org.apache.cassandra.procedures;
+
+import groovy.lang.GroovyShell;
+
+public class GroovyScriptRunner
+{
+	private static GroovyShell groovyShell_ = new GroovyShell();
+
+	public static String evaluateString(String script)
+	{        
+		 return groovyShell_.evaluate(script).toString();
+	}
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Thu Jul 30 15:30:21 2009
@@ -1,183 +1,183 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.Cachetable;
-import org.apache.cassandra.utils.ICacheExpungeHook;
-import org.apache.cassandra.utils.ICachetable;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
-
-
-class ConsistencyManager implements Runnable
-{
-	private static Logger logger_ = Logger.getLogger(ConsistencyManager.class);
-	
-	class DigestResponseHandler implements IAsyncCallback
-	{
-		List<Message> responses_ = new ArrayList<Message>();
-		
-		public void response(Message msg)
-		{
-			responses_.add(msg);
-			if ( responses_.size() == ConsistencyManager.this.replicas_.size() )
-				handleDigestResponses();
-		}
-        
-        public void attachContext(Object o)
-        {
-            throw new UnsupportedOperationException("This operation is not currently supported.");
-        }
-		
-		private void handleDigestResponses()
-		{
-			DataInputBuffer bufIn = new DataInputBuffer();
-			for( Message response : responses_ )
-			{
-				byte[] body = response.getMessageBody();            
-	            bufIn.reset(body, body.length);
-	            try
-	            {	               
-	                ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
-	                byte[] digest = result.digest();
-	                if( !Arrays.equals(row_.digest(), digest) )
-					{
-	                	doReadRepair();
-	                	break;
-					}
-	            }
-	            catch( IOException ex )
-	            {
-	            	logger_.info(LogUtil.throwableToString(ex));
-	            }
-			}
-		}
-		
-		private void doReadRepair() throws IOException
-		{
-			IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
-            /* Add the local storage endpoint to the replicas_ list */
-            replicas_.add(StorageService.getLocalStorageEndPoint());
-			IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);	
-            ReadCommand readCommand = constructReadMessage(false);
-            Message message = readCommand.makeReadMessage();
-            if (logger_.isDebugEnabled())
-              logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
-			MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), responseHandler);
-		}
-	}
-	
-	class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
-	{
-		private List<Message> responses_ = new ArrayList<Message>();
-		private IResponseResolver<Row> readResponseResolver_;
-		private int majority_;
-		
-		DataRepairHandler(int responseCount, IResponseResolver<Row> readResponseResolver)
-		{
-			readResponseResolver_ = readResponseResolver;
-			majority_ = (responseCount >> 1) + 1;  
-		}
-		
-		public void response(Message message)
-		{
-			if (logger_.isDebugEnabled())
-			  logger_.debug("Received responses in DataRepairHandler : " + message.toString());
-			responses_.add(message);
-			if ( responses_.size() == majority_ )
-			{
-				String messageId = message.getMessageId();
-				readRepairTable_.put(messageId, messageId, this);				
-			}
-		}
-        
-        public void attachContext(Object o)
-        {
-            throw new UnsupportedOperationException("This operation is not currently supported.");
-        }
-		
-		public void callMe(String key, String value)
-		{
-			handleResponses();
-		}
-		
-		private void handleResponses()
-		{
-			try
-			{
-				readResponseResolver_.resolve(new ArrayList<Message>(responses_));
-			}
-			catch ( DigestMismatchException ex )
-			{
-				throw new RuntimeException(ex);
-			}
-		}
-	}
-
-	private static long scheduledTimeMillis_ = 600;
-	private static ICachetable<String, String> readRepairTable_ = new Cachetable<String, String>(scheduledTimeMillis_);
-	private final Row row_;
-	protected final List<EndPoint> replicas_;
-	private final ReadCommand readCommand_;
-
-    public ConsistencyManager(Row row, List<EndPoint> replicas, ReadCommand readCommand)
-    {
-        row_ = row;
-        replicas_ = replicas;
-        readCommand_ = readCommand;
-    }
-
-	public void run()
-	{
-        ReadCommand readCommandDigestOnly = constructReadMessage(true);
-		try
-		{
-			Message message = readCommandDigestOnly.makeReadMessage();
-            if (logger_.isDebugEnabled())
-              logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
-            MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), new DigestResponseHandler());
-		}
-		catch (IOException ex)
-		{
-			throw new RuntimeException(ex);
-		}
-	}
-    
-    private ReadCommand constructReadMessage(boolean isDigestQuery)
-    {
-        ReadCommand readCommand = readCommand_.copy();
-        readCommand.setDigestQuery(isDigestQuery);
-        return readCommand;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.ICacheExpungeHook;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+
+class ConsistencyManager implements Runnable
+{
+	private static Logger logger_ = Logger.getLogger(ConsistencyManager.class);
+	
+	class DigestResponseHandler implements IAsyncCallback
+	{
+		List<Message> responses_ = new ArrayList<Message>();
+		
+		public void response(Message msg)
+		{
+			responses_.add(msg);
+			if ( responses_.size() == ConsistencyManager.this.replicas_.size() )
+				handleDigestResponses();
+		}
+        
+        public void attachContext(Object o)
+        {
+            throw new UnsupportedOperationException("This operation is not currently supported.");
+        }
+		
+		private void handleDigestResponses()
+		{
+			DataInputBuffer bufIn = new DataInputBuffer();
+			for( Message response : responses_ )
+			{
+				byte[] body = response.getMessageBody();            
+	            bufIn.reset(body, body.length);
+	            try
+	            {	               
+	                ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+	                byte[] digest = result.digest();
+	                if( !Arrays.equals(row_.digest(), digest) )
+					{
+	                	doReadRepair();
+	                	break;
+					}
+	            }
+	            catch( IOException ex )
+	            {
+	            	logger_.info(LogUtil.throwableToString(ex));
+	            }
+			}
+		}
+		
+		private void doReadRepair() throws IOException
+		{
+			IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
+            /* Add the local storage endpoint to the replicas_ list */
+            replicas_.add(StorageService.getLocalStorageEndPoint());
+			IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);	
+            ReadCommand readCommand = constructReadMessage(false);
+            Message message = readCommand.makeReadMessage();
+            if (logger_.isDebugEnabled())
+              logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
+			MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), responseHandler);
+		}
+	}
+	
+	class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
+	{
+		private List<Message> responses_ = new ArrayList<Message>();
+		private IResponseResolver<Row> readResponseResolver_;
+		private int majority_;
+		
+		DataRepairHandler(int responseCount, IResponseResolver<Row> readResponseResolver)
+		{
+			readResponseResolver_ = readResponseResolver;
+			majority_ = (responseCount >> 1) + 1;  
+		}
+		
+		public void response(Message message)
+		{
+			if (logger_.isDebugEnabled())
+			  logger_.debug("Received responses in DataRepairHandler : " + message.toString());
+			responses_.add(message);
+			if ( responses_.size() == majority_ )
+			{
+				String messageId = message.getMessageId();
+				readRepairTable_.put(messageId, messageId, this);				
+			}
+		}
+        
+        public void attachContext(Object o)
+        {
+            throw new UnsupportedOperationException("This operation is not currently supported.");
+        }
+		
+		public void callMe(String key, String value)
+		{
+			handleResponses();
+		}
+		
+		private void handleResponses()
+		{
+			try
+			{
+				readResponseResolver_.resolve(new ArrayList<Message>(responses_));
+			}
+			catch ( DigestMismatchException ex )
+			{
+				throw new RuntimeException(ex);
+			}
+		}
+	}
+
+	private static long scheduledTimeMillis_ = 600;
+	private static ICachetable<String, String> readRepairTable_ = new Cachetable<String, String>(scheduledTimeMillis_);
+	private final Row row_;
+	protected final List<EndPoint> replicas_;
+	private final ReadCommand readCommand_;
+
+    public ConsistencyManager(Row row, List<EndPoint> replicas, ReadCommand readCommand)
+    {
+        row_ = row;
+        replicas_ = replicas;
+        readCommand_ = readCommand;
+    }
+
+	public void run()
+	{
+        ReadCommand readCommandDigestOnly = constructReadMessage(true);
+		try
+		{
+			Message message = readCommandDigestOnly.makeReadMessage();
+            if (logger_.isDebugEnabled())
+              logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
+            MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), new DigestResponseHandler());
+		}
+		catch (IOException ex)
+		{
+			throw new RuntimeException(ex);
+		}
+	}
+    
+    private ReadCommand constructReadMessage(boolean isDigestQuery)
+    {
+        ReadCommand readCommand = readCommand_.copy();
+        readCommand.setDigestQuery(isDigestQuery);
+        return readCommand;
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DigestMismatchException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DigestMismatchException.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DigestMismatchException.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DigestMismatchException.java Thu Jul 30 15:30:21 2009
@@ -1,30 +1,30 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class DigestMismatchException extends Exception
-{
-	public DigestMismatchException(String message)
-	{
-		super(message);
-	}
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DigestMismatchException extends Exception
+{
+	public DigestMismatchException(String message)
+	{
+		super(message);
+	}
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java Thu Jul 30 15:30:21 2009
@@ -1,42 +1,42 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.util.List;
-
-import org.apache.cassandra.net.Message;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface IResponseResolver<T> {
-
-	/*
-	 * This Method resolves the responses that are passed in . for example : if
-	 * its write response then all we get is true or false return values which
-	 * implies if the writes were successful but for reads its more complicated
-	 * you need to look at the responses and then based on differences schedule
-	 * repairs . Hence you need to derive a response resolver based on your
-	 * needs from this interface.
-	 */
-	public T resolve(List<Message> responses) throws DigestMismatchException;
-	public boolean isDataPresent(List<Message> responses);
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IResponseResolver<T> {
+
+	/*
+	 * This Method resolves the responses that are passed in . for example : if
+	 * its write response then all we get is true or false return values which
+	 * implies if the writes were successful but for reads its more complicated
+	 * you need to look at the responses and then based on differences schedule
+	 * repairs . Hence you need to derive a response resolver based on your
+	 * needs from this interface.
+	 */
+	public T resolve(List<Message> responses) throws DigestMismatchException;
+	public boolean isDataPresent(List<Message> responses);
+
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/LoadDisseminator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/LoadDisseminator.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/LoadDisseminator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/LoadDisseminator.java Thu Jul 30 15:30:21 2009
@@ -1,48 +1,48 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.util.TimerTask;
-
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.utils.FileUtils;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-class LoadDisseminator extends TimerTask
-{
-    private final static Logger logger_ = Logger.getLogger(LoadDisseminator.class);
-    protected final static String loadInfo_= "LOAD-INFORMATION";
-    
-    public void run()
-    {
-        try
-        {
-            long diskSpace = FileUtils.getUsedDiskSpace();                
-            String diskUtilization = FileUtils.stringifyFileSize(diskSpace);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Disseminating load info ...");
-            Gossiper.instance().addApplicationState(LoadDisseminator.loadInfo_, new ApplicationState(diskUtilization));
-        }
-        catch ( Throwable ex )
-        {
-            logger_.warn( LogUtil.throwableToString(ex) );
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.TimerTask;
+
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+class LoadDisseminator extends TimerTask
+{
+    private final static Logger logger_ = Logger.getLogger(LoadDisseminator.class);
+    protected final static String loadInfo_= "LOAD-INFORMATION";
+    
+    public void run()
+    {
+        try
+        {
+            long diskSpace = FileUtils.getUsedDiskSpace();                
+            String diskUtilization = FileUtils.stringifyFileSize(diskSpace);
+            if (logger_.isDebugEnabled())
+              logger_.debug("Disseminating load info ...");
+            Gossiper.instance().addApplicationState(LoadDisseminator.loadInfo_, new ApplicationState(diskUtilization));
+        }
+        catch ( Throwable ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/LoadInfo.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/LoadInfo.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/LoadInfo.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/LoadInfo.java Thu Jul 30 15:30:21 2009
@@ -1,64 +1,64 @@
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.util.Comparator;
-
-import org.apache.cassandra.utils.FileUtils;
-
-
-class LoadInfo
-{
-    protected static class DiskSpaceComparator implements Comparator<LoadInfo>
-    {
-        public int compare(LoadInfo li, LoadInfo li2)
-        {
-            if ( li == null || li2 == null )
-                throw new IllegalArgumentException("Cannot pass in values that are NULL.");
-            
-            double space = FileUtils.stringToFileSize(li.diskSpace_);
-            double space2 = FileUtils.stringToFileSize(li2.diskSpace_);
-            return (int)(space - space2);
-        }
-    }
-        
-    private String diskSpace_;
-    
-    LoadInfo(long diskSpace)
-    {       
-        diskSpace_ = FileUtils.stringifyFileSize(diskSpace);
-    }
-    
-    LoadInfo(String loadInfo)
-    {
-        diskSpace_ = loadInfo;
-    }
-    
-    String diskSpace()
-    {
-        return diskSpace_;
-    }
-    
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder("");       
-        sb.append(diskSpace_);
-        return sb.toString();
-    }
-}
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.Comparator;
+
+import org.apache.cassandra.utils.FileUtils;
+
+
+class LoadInfo
+{
+    protected static class DiskSpaceComparator implements Comparator<LoadInfo>
+    {
+        public int compare(LoadInfo li, LoadInfo li2)
+        {
+            if ( li == null || li2 == null )
+                throw new IllegalArgumentException("Cannot pass in values that are NULL.");
+            
+            double space = FileUtils.stringToFileSize(li.diskSpace_);
+            double space2 = FileUtils.stringToFileSize(li2.diskSpace_);
+            return (int)(space - space2);
+        }
+    }
+        
+    private String diskSpace_;
+    
+    LoadInfo(long diskSpace)
+    {       
+        diskSpace_ = FileUtils.stringifyFileSize(diskSpace);
+    }
+    
+    LoadInfo(String loadInfo)
+    {
+        diskSpace_ = loadInfo;
+    }
+    
+    String diskSpace()
+    {
+        return diskSpace_;
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("");       
+        sb.append(diskSpace_);
+        return sb.toString();
+    }
+}



Mime
View raw message