cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r929767 - /cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Date Thu, 01 Apr 2010 01:35:59 GMT
Author: jbellis
Date: Thu Apr  1 01:35:59 2010
New Revision: 929767

URL: http://svn.apache.org/viewvc?rev=929767&view=rev
Log:
merge from 0.6

Added:
    cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java   (with
props)

Added: cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=929767&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java Thu Apr
 1 01:35:59 2010
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.cache.ICacheExpungeHook;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class ConsistencyChecker implements Runnable
+{
+    private static Logger logger_ = LoggerFactory.getLogger(ConsistencyManager.class);
+    private static long scheduledTimeMillis_ = 600;
+    private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String,
String>(scheduledTimeMillis_);
+
+    private final String table_;
+    private final Row row_;
+    protected final List<InetAddress> replicas_;
+    private final ReadCommand readCommand_;
+
+    public ConsistencyChecker(String table, Row row, List<InetAddress> replicas, ReadCommand
readCommand)
+    {
+        table_ = table;
+        row_ = row;
+        replicas_ = replicas;
+        readCommand_ = readCommand;
+    }
+
+    public void run()
+	{
+        ReadCommand readCommandDigestOnly = constructReadMessage(true);
+		try
+		{
+			Message message = readCommandDigestOnly.makeReadMessage();
+            if (logger_.isDebugEnabled())
+              logger_.debug("Reading consistency digest for " + readCommand_.key + " from
" + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
+            MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]),
new DigestResponseHandler());
+		}
+		catch (IOException ex)
+		{
+			throw new RuntimeException(ex);
+		}
+	}
+
+    private ReadCommand constructReadMessage(boolean isDigestQuery)
+    {
+        ReadCommand readCommand = readCommand_.copy();
+        readCommand.setDigestQuery(isDigestQuery);
+        return readCommand;
+    }
+
+    class DigestResponseHandler implements IAsyncCallback
+	{
+		Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
+
+        // syncronized so "size() == " works
+		public synchronized void response(Message msg)
+		{
+			responses_.add(msg);
+            if (responses_.size() != ConsistencyChecker.this.replicas_.size())
+                return;
+
+            for (Message response : responses_)
+            {
+                try
+                {
+                    byte[] body = response.getMessageBody();
+                    ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+                    ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+                    byte[] digest = result.digest();
+                    if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
+                    {
+                        doReadRepair();
+                        break;
+                    }
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException("Error handling responses for " + row_, e);
+                }
+            }
+        }
+
+        private void doReadRepair() throws IOException
+		{
+            replicas_.add(FBUtilities.getLocalAddress());
+            IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_,
replicas_.size());
+            IAsyncCallback responseHandler = new DataRepairHandler(replicas_.size(), readResponseResolver);
+            ReadCommand readCommand = constructReadMessage(false);
+            Message message = readCommand.makeReadMessage();
+            if (logger_.isDebugEnabled())
+              logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId()
+ "@[" + StringUtils.join(replicas_, ", ") + "]");
+            MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]),
responseHandler);
+		}
+	}
+
+	static class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
+	{
+		private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
+		private final IResponseResolver<Row> readResponseResolver_;
+		private final int majority_;
+		
+		DataRepairHandler(int responseCount, IResponseResolver<Row> readResponseResolver)
+		{
+			readResponseResolver_ = readResponseResolver;
+			majority_ = (responseCount / 2) + 1;  
+		}
+
+        // synchronized so the " == majority" is safe
+		public synchronized void response(Message message)
+		{
+			if (logger_.isDebugEnabled())
+			  logger_.debug("Received responses in DataRepairHandler : " + message.toString());
+			responses_.add(message);
+            if (responses_.size() == majority_)
+            {
+                String messageId = message.getMessageId();
+                readRepairTable_.put(messageId, messageId, this);
+            }
+        }
+
+		public void callMe(String key, String value)
+		{
+            try
+			{
+				readResponseResolver_.resolve(responses_);
+            }
+            catch (Exception ex)
+            {
+                throw new RuntimeException(ex);
+            }
+        }
+    }
+}

Propchange: cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message