cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r766838 - in /incubator/cassandra/trunk: src/org/apache/cassandra/db/ src/org/apache/cassandra/service/ src/org/apache/cassandra/test/ test/org/apache/cassandra/db/
Date Mon, 20 Apr 2009 20:05:08 GMT
Author: jbellis
Date: Mon Apr 20 20:05:07 2009
New Revision: 766838

URL: http://svn.apache.org/viewvc?rev=766838&view=rev
Log:
rename get_cf -> readColumnFamily; ReadMessage -> ReadCommand.
[Message message = ReadMessage.readMessage(readMessage) is just plain confusing]
patch by jbellis; reviewed by Eric Evans for #88

Added:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
      - copied, changed from r766140, incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
Removed:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
    incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java

Copied: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java (from r766140,
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java?p2=incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java&p1=incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java&r1=766140&r2=766838&rev=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java Mon Apr 20 20:05:07
2009
@@ -28,7 +28,6 @@
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.cassandra.continuations.Suspendable;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
@@ -38,26 +37,26 @@
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com
)
  */
 
-public class ReadMessage implements Serializable
+public class ReadCommand implements Serializable
 {
-    private static ICompactSerializer<ReadMessage> serializer_;	
+    private static ICompactSerializer<ReadCommand> serializer_;
     public static final String doRepair_ = "READ-REPAIR";
-	
+
     static
     {
-        serializer_ = new ReadMessageSerializer();
+        serializer_ = new ReadCommandSerializer();
     }
 
-    static ICompactSerializer<ReadMessage> serializer()
+    static ICompactSerializer<ReadCommand> serializer()
     {
         return serializer_;
     }
     
-    public static Message makeReadMessage(ReadMessage readMessage) throws IOException
+    public static Message makeReadMessage(ReadCommand readCommand) throws IOException
     {
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
-        ReadMessage.serializer().serialize(readMessage, dos);
+        ReadCommand.serializer().serialize(readCommand, dos);
         Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_,
StorageService.readVerbHandler_, new Object[]{bos.toByteArray()});         
         return message;
     }
@@ -71,24 +70,24 @@
     private List<String> columns_ = new ArrayList<String>();
     private boolean isDigestQuery_ = false;
         
-    private ReadMessage()
+    private ReadCommand()
     {
     }
     
-    public ReadMessage(String table, String key)
+    public ReadCommand(String table, String key)
     {
         table_ = table;
         key_ = key;
     }
 
-    public ReadMessage(String table, String key, String columnFamily_column)
+    public ReadCommand(String table, String key, String columnFamily_column)
     {
         table_ = table;
         key_ = key;
         columnFamily_column_ = columnFamily_column;
     }
     
-    public ReadMessage(String table, String key, String columnFamily, List<String>
columns)
+    public ReadCommand(String table, String key, String columnFamily, List<String>
columns)
     {
     	table_ = table;
     	key_ = key;
@@ -96,7 +95,7 @@
     	columns_ = columns;
     }
     
-    public ReadMessage(String table, String key, String columnFamily_column, int start, int
count)
+    public ReadCommand(String table, String key, String columnFamily_column, int start, int
count)
     {
         table_ = table;
         key_ = key;
@@ -105,7 +104,7 @@
         count_ = count;
     }
 
-    public ReadMessage(String table, String key, String columnFamily_column, long sinceTimestamp)
+    public ReadCommand(String table, String key, String columnFamily_column, long sinceTimestamp)
     {
         table_ = table;
         key_ = key;
@@ -173,9 +172,9 @@
     }
 }
 
-class ReadMessageSerializer implements ICompactSerializer<ReadMessage>
+class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
 {
-	public void serialize(ReadMessage rm, DataOutputStream dos) throws IOException
+	public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
 	{
 		dos.writeUTF(rm.table());
 		dos.writeUTF(rm.key());
@@ -196,7 +195,7 @@
 		}
 	}
 	
-    public ReadMessage deserialize(DataInputStream dis) throws IOException
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
     {
 		String table = dis.readUTF();
 		String key = dis.readUTF();
@@ -214,18 +213,18 @@
 			dis.readFully(bytes);
 			columns.add( new String(bytes) );
 		}
-		ReadMessage rm = null;
+		ReadCommand rm = null;
 		if ( columns.size() > 0 )
 		{
-			rm = new ReadMessage(table, key, columnFamily_column, columns);
+			rm = new ReadCommand(table, key, columnFamily_column, columns);
 		}
 		else if( sinceTimestamp > 0 )
 		{
-			rm = new ReadMessage(table, key, columnFamily_column, sinceTimestamp);
+			rm = new ReadCommand(table, key, columnFamily_column, sinceTimestamp);
 		}
 		else
 		{
-			rm = new ReadMessage(table, key, columnFamily_column, start, count);
+			rm = new ReadCommand(table, key, columnFamily_column, start, count);
 		}
 		rm.setIsDigestQuery(isDigest);
     	return rm;

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java Mon Apr 20
20:05:07 2009
@@ -19,12 +19,9 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.continuations.Suspendable;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.net.EndPoint;
@@ -34,8 +31,6 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.utils.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com
)
@@ -77,30 +72,30 @@
 
         try
         {
-            ReadMessage readMessage = ReadMessage.serializer().deserialize(readCtx.bufIn_);
-            Table table = Table.open(readMessage.table());
+            ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
+            Table table = Table.open(readCommand.table());
             Row row = null;
             long start = System.currentTimeMillis();
-            if( readMessage.columnFamily_column() == null )
-            	row = table.get(readMessage.key());
+            if( readCommand.columnFamily_column() == null )
+            	row = table.get(readCommand.key());
             else
             {
-            	if(readMessage.getColumnNames().size() == 0)
+            	if(readCommand.getColumnNames().size() == 0)
             	{
-	            	if(readMessage.count() > 0 && readMessage.start() >= 0)
-	            		row = table.getRow(readMessage.key(), readMessage.columnFamily_column(), readMessage.start(),
readMessage.count());
+	            	if(readCommand.count() > 0 && readCommand.start() >= 0)
+	            		row = table.getRow(readCommand.key(), readCommand.columnFamily_column(), readCommand.start(),
readCommand.count());
 	            	else
-	            		row = table.getRow(readMessage.key(), readMessage.columnFamily_column());
+	            		row = table.getRow(readCommand.key(), readCommand.columnFamily_column());
             	}
             	else
             	{
-            		row = table.getRow(readMessage.key(), readMessage.columnFamily_column(), readMessage.getColumnNames());
           		
+            		row = table.getRow(readCommand.key(), readCommand.columnFamily_column(), readCommand.getColumnNames());
             	}
             }              
             logger_.info("getRow()  TIME: " + (System.currentTimeMillis() - start) + " ms.");
             start = System.currentTimeMillis();
             ReadResponseMessage readResponseMessage = null;
-            if(readMessage.isDigestQuery())
+            if(readCommand.isDigestQuery())
             {
                 readResponseMessage = new ReadResponseMessage(table.getTableName(), row.digest());
             }
@@ -108,7 +103,7 @@
             {
                 readResponseMessage = new ReadResponseMessage(table.getTableName(), row);
             }
-            readResponseMessage.setIsDigestQuery(readMessage.isDigestQuery());
+            readResponseMessage.setIsDigestQuery(readCommand.isDigestQuery());
             /* serialize the ReadResponseMessage. */
             readCtx.bufOut_.reset();
 
@@ -126,9 +121,9 @@
             logger_.info("ReadVerbHandler  TIME 2: " + (System.currentTimeMillis() - start)
+ " ms.");
             
             /* Do read repair if header of the message says so */
-            String repair = new String( message.getHeader(ReadMessage.doRepair_) );
-            if ( repair.equals( ReadMessage.doRepair_ ) )
-                doReadRepair(row, readMessage);
+            String repair = new String( message.getHeader(ReadCommand.doRepair_) );
+            if ( repair.equals( ReadCommand.doRepair_ ) )
+                doReadRepair(row, readCommand);
         }
         catch ( IOException ex)
         {
@@ -140,29 +135,29 @@
         }
     }
     
-    private void doReadRepair(Row row, ReadMessage readMessage)
+    private void doReadRepair(Row row, ReadCommand readCommand)
     {
         if ( DatabaseDescriptor.getConsistencyCheck() )
         {
-            List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readMessage.key());
+            List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readCommand.key());
             /* Remove the local storage endpoint from the list. */ 
             endpoints.remove( StorageService.getLocalStorageEndPoint() );
             
-            if(readMessage.getColumnNames().size() == 0)
+            if(readCommand.getColumnNames().size() == 0)
             {
-                if( readMessage.start() >= 0 && readMessage.count() < Integer.MAX_VALUE)
+                if( readCommand.start() >= 0 && readCommand.count() < Integer.MAX_VALUE)
                 {                
-                    StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(),
readMessage.start(), readMessage.count());                    
+                    StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamily_column(),
readCommand.start(), readCommand.count());
                 }
                 
-                if( readMessage.sinceTimestamp() > 0)
+                if( readCommand.sinceTimestamp() > 0)
                 {                    
-                    StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(),
readMessage.sinceTimestamp());                    
+                    StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamily_column(),
readCommand.sinceTimestamp());
                 }                
             }
             else
             {
-                StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(),
readMessage.getColumnNames());                                
+                StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamily_column(),
readCommand.getColumnNames());
             }
         }
     }     

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Mon Apr
20 20:05:07 2009
@@ -37,7 +37,6 @@
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Column;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.thrift.TException;
 
@@ -81,7 +80,7 @@
 		}
 	}
     
-	protected ColumnFamily get_cf(String tablename, String key, String columnFamily, List<String>
columNames) throws CassandraException, TException
+	protected ColumnFamily readColumnFamily(String tablename, String key, String columnFamily,
List<String> columNames) throws CassandraException, TException
 	{
     	ColumnFamily cfamily = null;
 		try
@@ -205,7 +204,7 @@
 		try
 		{
 			validateTable(tablename);
-			ColumnFamily cfamily = get_cf(tablename, key, columnFamily, columnNames);
+			ColumnFamily cfamily = readColumnFamily(tablename, key, columnFamily, columnNames);
 			if (cfamily == null)
 			{
 				logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "
@@ -486,7 +485,7 @@
 		try
 		{
 			validateTable(tablename);
-			ColumnFamily cfamily = get_cf(tablename, key, columnFamily, superColumnNames);
+			ColumnFamily cfamily = readColumnFamily(tablename, key, columnFamily, superColumnNames);
 			if (cfamily == null)
 			{
 				logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "+"   key:" +
key

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java Mon
Apr 20 20:05:07 2009
@@ -22,13 +22,9 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponseMessage;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.io.DataInputBuffer;
@@ -96,9 +92,9 @@
             replicas_.add(StorageService.getLocalStorageEndPoint());
 			IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(),
readResponseResolver);	
 			String table = DatabaseDescriptor.getTables().get(0);
-            ReadMessage readMessage = constructReadMessage(false);
+            ReadCommand readCommand = constructReadMessage(false);
 			// ReadMessage readMessage = new ReadMessage(table, row_.key(), columnFamily_);
-            Message message = ReadMessage.makeReadMessage(readMessage);
+            Message message = ReadCommand.makeReadMessage(readCommand);
 			MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray( new EndPoint[0]
), responseHandler);			
 		}
 	}
@@ -187,10 +183,10 @@
 	public void run()
 	{
 		logger_.debug(" Run the consistency checks for " + columnFamily_);		
-        ReadMessage readMessageDigestOnly = constructReadMessage(true);
+        ReadCommand readCommandDigestOnly = constructReadMessage(true);
 		try
 		{
-			Message messageDigestOnly = ReadMessage.makeReadMessage(readMessageDigestOnly);
+			Message messageDigestOnly = ReadCommand.makeReadMessage(readCommandDigestOnly);
 			IAsyncCallback digestResponseHandler = new DigestResponseHandler();
 			MessagingService.getMessagingInstance().sendRR(messageDigestOnly, replicas_.toArray(new
EndPoint[0]), digestResponseHandler);
 		}
@@ -200,32 +196,32 @@
 		}
 	}
     
-    private ReadMessage constructReadMessage(boolean isDigestQuery)
+    private ReadCommand constructReadMessage(boolean isDigestQuery)
     {
-        ReadMessage readMessage = null;
+        ReadCommand readCommand = null;
         String table = DatabaseDescriptor.getTables().get(0);
         
         if(columnNames_.size() == 0)
         {
             if( start_ >= 0 && count_ < Integer.MAX_VALUE)
             {
-                readMessage = new ReadMessage(table, row_.key(), columnFamily_, start_, count_);
+                readCommand = new ReadCommand(table, row_.key(), columnFamily_, start_, count_);
             }
             else if(sinceTimestamp_ > 0)
             {
-                readMessage = new ReadMessage(table, row_.key(), columnFamily_, sinceTimestamp_);
+                readCommand = new ReadCommand(table, row_.key(), columnFamily_, sinceTimestamp_);
             }
             else
             {
-                readMessage = new ReadMessage(table, row_.key(), columnFamily_);
+                readCommand = new ReadCommand(table, row_.key(), columnFamily_);
             }
         }
         else
         {
-            readMessage = new ReadMessage(table, row_.key(), columnFamily_, columnNames_);
+            readCommand = new ReadCommand(table, row_.key(), columnFamily_, columnNames_);
             
         }
-        readMessage.setIsDigestQuery(isDigestQuery);
-        return readMessage;
+        readCommand.setIsDigestQuery(isDigestQuery);
+        return readCommand;
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
Mon Apr 20 20:05:07 2009
@@ -28,7 +28,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -47,7 +47,7 @@
     private Lock lock_ = new ReentrantLock();
     private Condition condition_;
     /* This maps the keys to the original data read messages */
-    private Map<String, ReadMessage> readMessages_ = new HashMap<String, ReadMessage>();
+    private Map<String, ReadCommand> readMessages_ = new HashMap<String, ReadCommand>();
     /* This maps the key to its set of replicas */
     private Map<String, EndPoint[]> endpoints_ = new HashMap<String, EndPoint[]>();
     /* This maps the groupId to the individual callback for the set of messages */
@@ -129,10 +129,10 @@
         {
             if ( DatabaseDescriptor.getConsistencyCheck())
             {                                
-                ReadMessage readMessage = readMessages_.get(key);
-                readMessage.setIsDigestQuery(false);            
-                Message messageRepair = ReadMessage.makeReadMessage(readMessage);
-                EndPoint[] endpoints = MultiQuorumResponseHandler.this.endpoints_.get( readMessage.key()
);
+                ReadCommand readCommand = readMessages_.get(key);
+                readCommand.setIsDigestQuery(false);
+                Message messageRepair = ReadCommand.makeReadMessage(readCommand);
+                EndPoint[] endpoints = MultiQuorumResponseHandler.this.endpoints_.get( readCommand.key()
);
                 Message[][] messages = new Message[][]{ {messageRepair, messageRepair, messageRepair}
};
                 EndPoint[][] epList = new EndPoint[][]{ endpoints };
                 MessagingService.getMessagingInstance().sendRR(messages, epList, MultiQuorumResponseHandler.this);
               
@@ -140,7 +140,7 @@
         }
     }
     
-    public MultiQuorumResponseHandler(Map<String, ReadMessage> readMessages, Map<String,
EndPoint[]> endpoints)
+    public MultiQuorumResponseHandler(Map<String, ReadCommand> readMessages, Map<String,
EndPoint[]> endpoints)
     {        
         condition_ = lock_.newCondition();
         readMessages_ = readMessages;

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java Mon Apr 20
20:05:07 2009
@@ -30,16 +30,14 @@
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponseMessage;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.RowMutationMessage;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.TouchMessage;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -145,13 +143,13 @@
         }
     }
     
-    private static Map<String, Message> constructMessages(Map<String, ReadMessage>
readMessages) throws IOException
+    private static Map<String, Message> constructMessages(Map<String, ReadCommand>
readMessages) throws IOException
     {
         Map<String, Message> messages = new HashMap<String, Message>();
         Set<String> keys = readMessages.keySet();        
         for ( String key : keys )
         {
-            Message message = ReadMessage.makeReadMessage( readMessages.get(key) );
+            Message message = ReadCommand.makeReadMessage( readMessages.get(key) );
             messages.put(key, message);
         }        
         return messages;
@@ -182,7 +180,7 @@
      * @throws IOException
      * @throws TimeoutException
      */
-    public static Map<String, Row> doReadProtocol(Map<String, ReadMessage> readMessages)
throws IOException,TimeoutException
+    public static Map<String, Row> doReadProtocol(Map<String, ReadCommand> readMessages)
throws IOException,TimeoutException
     {
         Map<String, Row> rows = new HashMap<String, Row>();
         Set<String> keys = readMessages.keySet();
@@ -206,14 +204,14 @@
         return rows;
     }
     
-    public static Row doReadProtocol(String key, ReadMessage readMessage) throws IOException,TimeoutException
+    public static Row doReadProtocol(String key, ReadCommand readCommand) throws IOException,TimeoutException
     {
         Row row = null;
         EndPoint endPoint = StorageService.instance().findSuitableEndPoint(key);        
         if(endPoint != null)
         {
-            Message message = ReadMessage.makeReadMessage(readMessage);
-            message.addHeader(ReadMessage.doRepair_, ReadMessage.doRepair_.getBytes());
+            Message message = ReadCommand.makeReadMessage(readCommand);
+            message.addHeader(ReadCommand.doRepair_, ReadCommand.doRepair_.getBytes());
             IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
             Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
             byte[] body = (byte[])result[0];
@@ -310,9 +308,9 @@
         }   
         if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
         {
-            ReadMessage readMessage = null;
-            readMessage = new ReadMessage(tablename, key, columnFamily, columnNames);
-            return doReadProtocol(key, readMessage);
+            ReadCommand readCommand = null;
+            readCommand = new ReadCommand(tablename, key, columnFamily, columnNames);
+            return doReadProtocol(key, readCommand);
         }
         else
         {
@@ -349,9 +347,9 @@
         }   
         if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
         {
-            ReadMessage readMessage = null;
-            readMessage = new ReadMessage(tablename, key, columnFamily, start, count);
-            return doReadProtocol(key, readMessage);
+            ReadCommand readCommand = null;
+            readCommand = new ReadCommand(tablename, key, columnFamily, start, count);
+            return doReadProtocol(key, readCommand);
         }
         else
         {
@@ -408,9 +406,9 @@
         }   
         if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
         {
-            ReadMessage readMessage = null;
-            readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
-            return doReadProtocol(key, readMessage);
+            ReadCommand readCommand = null;
+            readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
+            return doReadProtocol(key, readCommand);
         }
         else
         {
@@ -436,12 +434,12 @@
     {       
         long startTime = System.currentTimeMillis();        
         // TODO: throw a thrift exception if we do not have N nodes
-        ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, columns);
              
+        ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, columns);
         
-        ReadMessage readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily,
columns);     
-        readMessageDigestOnly.setIsDigestQuery(true);        
+        ReadCommand readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily,
columns);
+        readCommandDigestOnly.setIsDigestQuery(true);
         
-        Row row = StorageProxy.doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+        Row row = StorageProxy.doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
         logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
    
         return row;
     }
@@ -466,27 +464,27 @@
     {       
         long startTime = System.currentTimeMillis();        
         // TODO: throw a thrift exception if we do not have N nodes
-        ReadMessage readMessage = null;
-        ReadMessage readMessageDigestOnly = null;
+        ReadCommand readCommand = null;
+        ReadCommand readCommandDigestOnly = null;
         if( start >= 0 && count < Integer.MAX_VALUE)
         {
-            readMessage = new ReadMessage(tablename, key, columnFamily, start, count);
+            readCommand = new ReadCommand(tablename, key, columnFamily, start, count);
         }
         else
         {
-            readMessage = new ReadMessage(tablename, key, columnFamily);
+            readCommand = new ReadCommand(tablename, key, columnFamily);
         }
-        Message message = ReadMessage.makeReadMessage(readMessage);
+        Message message = ReadCommand.makeReadMessage(readCommand);
         if( start >= 0 && count < Integer.MAX_VALUE)
         {
-            readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, start,
count);
+            readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, start,
count);
         }
         else
         {
-            readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily);
+            readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily);
         }
-        readMessageDigestOnly.setIsDigestQuery(true);        
-        Row row = doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+        readCommandDigestOnly.setIsDigestQuery(true);
+        Row row = doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
         logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
         return row;
     }
@@ -507,27 +505,27 @@
         Map<String, Row> rows = new HashMap<String, Row>();
         long startTime = System.currentTimeMillis();        
         // TODO: throw a thrift exception if we do not have N nodes
-        Map<String, ReadMessage[]> readMessages = new HashMap<String, ReadMessage[]>();
       
+        Map<String, ReadCommand[]> readMessages = new HashMap<String, ReadCommand[]>();
         for (String key : keys )
         {
-            ReadMessage[] readMessage = new ReadMessage[2];
+            ReadCommand[] readCommand = new ReadCommand[2];
             if( start >= 0 && count < Integer.MAX_VALUE)
             {
-                readMessage[0] = new ReadMessage(tablename, key, columnFamily, start, count);
+                readCommand[0] = new ReadCommand(tablename, key, columnFamily, start, count);
             }
             else
             {
-                readMessage[0] = new ReadMessage(tablename, key, columnFamily);
+                readCommand[0] = new ReadCommand(tablename, key, columnFamily);
             }            
             if( start >= 0 && count < Integer.MAX_VALUE)
             {
-                readMessage[1] = new ReadMessage(tablename, key, columnFamily, start, count);
+                readCommand[1] = new ReadCommand(tablename, key, columnFamily, start, count);
             }
             else
             {
-                readMessage[1] = new ReadMessage(tablename, key, columnFamily);
+                readCommand[1] = new ReadCommand(tablename, key, columnFamily);
             }
-            readMessage[1].setIsDigestQuery(true);
+            readCommand[1].setIsDigestQuery(true);
         }        
         rows = doStrongReadProtocol(readMessages);         
         logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
@@ -538,13 +536,13 @@
     {       
         long startTime = System.currentTimeMillis();        
         // TODO: throw a thrift exception if we do not have N nodes
-        ReadMessage readMessage = null;
-        ReadMessage readMessageDigestOnly = null;
-        readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
-        Message message = ReadMessage.makeReadMessage(readMessage);
-        readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
-        readMessageDigestOnly.setIsDigestQuery(true);        
-        Row row = doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+        ReadCommand readCommand = null;
+        ReadCommand readCommandDigestOnly = null;
+        readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
+        Message message = ReadCommand.makeReadMessage(readCommand);
+        readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
+        readCommandDigestOnly.setIsDigestQuery(true);
+        Row row = doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
         logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
         return row;
     }
@@ -555,11 +553,11 @@
      *  param @ readMessage - the read message to get the actual data
      *  param @ readMessageDigest - the read message to get the digest.
     */
-    private static Row doStrongReadProtocol(String key, ReadMessage readMessage, ReadMessage
readMessageDigest) throws IOException, TimeoutException
+    private static Row doStrongReadProtocol(String key, ReadCommand readCommand, ReadCommand
readCommandDigest) throws IOException, TimeoutException
     {
         Row row = null;
-        Message message = ReadMessage.makeReadMessage(readMessage);
-        Message messageDigestOnly = ReadMessage.makeReadMessage(readMessageDigest);
+        Message message = ReadCommand.makeReadMessage(readCommand);
+        Message messageDigestOnly = ReadCommand.makeReadMessage(readCommandDigest);
         
         IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
         QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
@@ -607,9 +605,9 @@
 	            QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
 	                    DatabaseDescriptor.getReplicationFactor(),
 	                    readResponseResolverRepair);
-	            readMessage.setIsDigestQuery(false);
+	            readCommand.setIsDigestQuery(false);
 	            logger_.info("DigestMismatchException: " + key);            
-	            Message messageRepair = ReadMessage.makeReadMessage(readMessage);
+	            Message messageRepair = ReadCommand.makeReadMessage(readCommand);
 	            MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints, quorumResponseHandlerRepair);
 	            try
 	            {
@@ -628,7 +626,7 @@
         return row;
     }
     
-    private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadMessage[]>
readMessages) throws IOException
+    private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadCommand[]>
readMessages) throws IOException
     {
         Map<String, Message[]> messages = new HashMap<String, Message[]>();
         Set<String> keys = readMessages.keySet();
@@ -636,21 +634,21 @@
         for ( String key : keys )
         {
             Message[] msg = new Message[DatabaseDescriptor.getReplicationFactor()];
-            ReadMessage[] readMessage = readMessages.get(key);
-            msg[0] = ReadMessage.makeReadMessage( readMessage[0] );            
+            ReadCommand[] readCommand = readMessages.get(key);
+            msg[0] = ReadCommand.makeReadMessage( readCommand[0] );
             for ( int i = 1; i < msg.length; ++i )
             {
-                msg[i] = ReadMessage.makeReadMessage( readMessage[1] );
+                msg[i] = ReadCommand.makeReadMessage( readCommand[1] );
             }
         }        
         return messages;
     }
     
-    private static MultiQuorumResponseHandler dispatchMessages(Map<String, ReadMessage[]>
readMessages, Map<String, Message[]> messages) throws IOException
+    private static MultiQuorumResponseHandler dispatchMessages(Map<String, ReadCommand[]>
readMessages, Map<String, Message[]> messages) throws IOException
     {
         Set<String> keys = messages.keySet();
         /* This maps the keys to the original data read messages */
-        Map<String, ReadMessage> readMessage = new HashMap<String, ReadMessage>();
+        Map<String, ReadCommand> readMessage = new HashMap<String, ReadCommand>();
         /* This maps the keys to their respective endpoints/replicas */
         Map<String, EndPoint[]> endpoints = new HashMap<String, EndPoint[]>();
         /* Groups the messages that need to be sent to the individual keys */
@@ -700,7 +698,7 @@
     *  @return map containing key ---> Row
     *  @throws IOException, TimeoutException
    */
-    private static Map<String, Row> doStrongReadProtocol(Map<String, ReadMessage[]>
readMessages) throws IOException
+    private static Map<String, Row> doStrongReadProtocol(Map<String, ReadCommand[]>
readMessages) throws IOException
     {        
         Map<String, Row> rows = new HashMap<String, Row>();
         /* Construct the messages to be sent to the replicas */
@@ -769,11 +767,11 @@
     {
         Row row = null;
         long startTime = System.currentTimeMillis();
-        Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+        Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
         for ( String key : keys )
         {
-            ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, columns);
-            readMessages.put(key, readMessage);
+            ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, columns);
+            readMessages.put(key, readCommand);
         }
         /* Performs the multiget in parallel */
         Map<String, Row> rows = doReadProtocol(readMessages);
@@ -850,11 +848,11 @@
     {
         Row row = null;
         long startTime = System.currentTimeMillis();
-        Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+        Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
         for ( String key : keys )
         {
-            ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, start,
count);
-            readMessages.put(key, readMessage);
+            ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, start,
count);
+            readMessages.put(key, readCommand);
         }
         /* Performs the multiget in parallel */
         Map<String, Row> rows = doReadProtocol(readMessages);
@@ -919,11 +917,11 @@
     {
         Row row = null;
         long startTime = System.currentTimeMillis();
-        Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+        Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
         for ( String key : keys )
         {
-            ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
-            readMessages.put(key, readMessage);
+            ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
+            readMessages.put(key, readCommand);
         }
         /* Performs the multiget in parallel */
         Map<String, Row> rows = doReadProtocol(readMessages);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java Mon Apr 20 20:05:07
2009
@@ -43,7 +43,7 @@
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponseMessage;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
@@ -880,10 +880,10 @@
 				key = user + ":1";
 			}
 
-			ReadMessage readMessage = new ReadMessage(tablename_, key);
+			ReadCommand readCommand = new ReadCommand(tablename_, key);
 			Message message = new Message(from_, StorageService.readStage_,
 					StorageService.readVerbHandler_,
-					new Object[] { readMessage });
+					new Object[] {readCommand});
 			IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(
 					message, to_);
 			Object[] result = iar.get();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java Mon Apr 20 20:05:07
2009
@@ -19,10 +19,7 @@
 package org.apache.cassandra.test;
 
 import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
@@ -30,17 +27,13 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.analytics.AnalyticsContext;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Memtable;
-import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.RowMutationMessage;
 import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.Cassandra;
@@ -147,7 +140,7 @@
         
     }	
 	
-    public void readLoad(ReadMessage readMessage)
+    public void readLoad(ReadCommand readCommand)
     {
 		IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
 		QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
@@ -155,7 +148,7 @@
 				readResponseResolver);
 		Message message = new Message(from_, StorageService.readStage_,
 				StorageService.readVerbHandler_,
-				new Object[] { readMessage });
+				new Object[] {readCommand});
 		MessagingService.getMessagingInstance().sendOneWay(message, to_);
 		/*IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, to_);
 		try
@@ -187,7 +180,7 @@
 	            String stringKey = new Integer(key).toString();
 	            stringKey = stringKey + keyFix_ ;
             	int j = random.nextInt(columns) + 1;
-	            ReadMessage rm = new ReadMessage(tablename_, stringKey, columnFamilyColumn_
+ ":" + columnFix_ + j);
+	            ReadCommand rm = new ReadCommand(tablename_, stringKey, columnFamilyColumn_
+ ":" + columnFix_ + j);
 	            readLoad(rm);
 				if ( requestsPerSecond_ > 1000)
 					Thread.sleep(0, 1000000000/requestsPerSecond_);
@@ -257,7 +250,7 @@
 	            stringKey = stringKey + keyFix_ ;
             	int i = random.nextInt(superColumns) + 1;
             	int j = random.nextInt(columns) + 1;
-	            ReadMessage rm = new ReadMessage(tablename_, stringKey, columnFamilySuperColumn_
+ ":" + superColumnFix_ + i + ":" + columnFix_ + j);
+	            ReadCommand rm = new ReadCommand(tablename_, stringKey, columnFamilySuperColumn_
+ ":" + superColumnFix_ + i + ":" + columnFix_ + j);
 	            readLoad(rm);
 			}
 		}

Modified: incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java?rev=766838&r1=766837&r2=766838&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java Mon Apr 20
20:05:07 2009
@@ -16,16 +16,16 @@
         colList.add("col1");
         colList.add("col2");
 
-        ReadMessage rm = new ReadMessage("Table1", "row1", "foo", colList);
-        ReadMessage rm2 = serializeAndDeserializeReadMessage(rm);
+        ReadCommand rm = new ReadCommand("Table1", "row1", "foo", colList);
+        ReadCommand rm2 = serializeAndDeserializeReadMessage(rm);
 
         assert rm2.toString().equals(rm.toString());
     }
 
-    private ReadMessage serializeAndDeserializeReadMessage(ReadMessage rm)
+    private ReadCommand serializeAndDeserializeReadMessage(ReadCommand rm)
     {
-        ReadMessage rm2 = null;
-        ReadMessageSerializer rms = (ReadMessageSerializer) ReadMessage.serializer();
+        ReadCommand rm2 = null;
+        ReadCommandSerializer rms = (ReadCommandSerializer) ReadCommand.serializer();
         DataOutputBuffer dos = new DataOutputBuffer();
         DataInputBuffer dis = new DataInputBuffer();
 



Mime
View raw message