Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 22804 invoked from network); 30 Sep 2009 19:41:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 30 Sep 2009 19:41:02 -0000 Received: (qmail 70721 invoked by uid 500); 30 Sep 2009 19:41:02 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 70695 invoked by uid 500); 30 Sep 2009 19:41:02 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Received: (qmail 70685 invoked by uid 99); 30 Sep 2009 19:41:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Sep 2009 19:41:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Sep 2009 19:40:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9054E23888E8; Wed, 30 Sep 2009 19:39:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r820417 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ service/ utils/ Date: Wed, 30 Sep 2009 19:39:58 -0000 To: cassandra-commits@incubator.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090930193958.9054E23888E8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jbellis Date: Wed Sep 30 19:39:57 2009 New Revision: 820417 URL: http://svn.apache.org/viewvc?rev=820417&view=rev Log: formatting + cleanup. patch by jbellis for CASSANDRA-462 Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=820417&r1=820416&r2=820417&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Wed Sep 30 19:39:57 2009 @@ -23,6 +23,7 @@ import java.security.MessageDigest; import java.io.IOException; +import org.apache.log4j.Logger; import org.apache.commons.lang.ArrayUtils; import org.apache.cassandra.db.marshal.AbstractType; @@ -37,6 +38,8 @@ public final class Column implements IColumn { + private static Logger logger_ = Logger.getLogger(Column.class); + private static ColumnSerializer serializer_ = new ColumnSerializer(); public static ColumnSerializer serializer() Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=820417&r1=820416&r2=820417&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Wed Sep 30 19:39:57 2009 @@ -115,20 +115,12 @@ boolean isDigest = dis.readBoolean(); Row row = null; - if ( !isDigest ) + if (!isDigest) { row = Row.serializer().deserialize(dis); } - - ReadResponse rmsg = null; - if( isDigest ) - { - rmsg = new ReadResponse(digest); - } - else - { - rmsg = new ReadResponse(row); - } + + ReadResponse rmsg = isDigest ? new ReadResponse(digest) : new ReadResponse(row); rmsg.setIsDigestQuery(isDigest); return rmsg; } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=820417&r1=820416&r2=820417&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed Sep 30 19:39:57 2009 @@ -75,9 +75,8 @@ } ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_); Table table = Table.open(readCommand.table); - Row row = null; - row = readCommand.getRow(table); - ReadResponse readResponse = null; + Row row = readCommand.getRow(table); + ReadResponse readResponse; if (readCommand.isDigestQuery()) { readResponse = new ReadResponse(row.digest()); Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=820417&r1=820416&r2=820417&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Wed Sep 30 19:39:57 2009 @@ -29,13 +29,11 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.apache.cassandra.io.ICompactSerializer; import org.apache.cassandra.io.DataOutputBuffer; -import org.apache.cassandra.utils.FBUtilities; public class Row { Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=820417&r1=820416&r2=820417&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Wed Sep 30 19:39:57 2009 @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.List; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.db.Row; @@ -129,21 +128,17 @@ public void callMe(String key, String value) { - handleResponses(); - } - - private void handleResponses() - { - try + try { readResponseResolver_.resolve(new ArrayList(responses_)); - } - catch ( DigestMismatchException ex ) - { - throw new RuntimeException(ex); - } - } - } + } + catch (Exception ex) + { + throw new RuntimeException(ex); + } + } + + } private static long scheduledTimeMillis_ = 600; private static ICachetable readRepairTable_ = new Cachetable(scheduledTimeMillis_); Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=820417&r1=820416&r2=820417&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java Wed Sep 30 19:39:57 2009 @@ -19,6 +19,7 @@ package org.apache.cassandra.service; import java.util.List; +import java.io.IOException; import org.apache.cassandra.net.Message; @@ -32,7 +33,7 @@ * repairs . Hence you need to derive a response resolver based on your * needs from this interface. */ - public T resolve(List responses) throws DigestMismatchException; + public T resolve(List responses) throws DigestMismatchException, IOException; public boolean isDataPresent(List responses); } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=820417&r1=820416&r2=820417&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Wed Sep 30 19:39:57 2009 @@ -24,6 +24,7 @@ import java.util.concurrent.locks.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.io.IOException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.net.IAsyncCallback; @@ -55,64 +56,67 @@ startTime_ = System.currentTimeMillis(); } - public T get() throws TimeoutException, DigestMismatchException + public T get() throws TimeoutException, DigestMismatchException, IOException { - lock_.lock(); + lock_.lock(); try - { - boolean bVal = true; + { + boolean bVal = true; try { - if ( !done_.get() ) + if (!done_.get()) { long timeout = System.currentTimeMillis() - startTime_ + DatabaseDescriptor.getRpcTimeout(); - if(timeout > 0) + if (timeout > 0) + { bVal = condition_.await(timeout, TimeUnit.MILLISECONDS); + } else + { bVal = false; + } } } - catch ( InterruptedException ex ) + catch (InterruptedException ex) { - if (logger_.isDebugEnabled()) - logger_.debug( LogUtil.throwableToString(ex) ); + throw new AssertionError(ex); } - - if ( !bVal && !done_.get() ) + + if (!bVal && !done_.get()) { StringBuilder sb = new StringBuilder(""); - for ( Message message : responses_ ) + for (Message message : responses_) { - sb.append(message.getFrom()); - } - throw new TimeoutException("Operation timed out - received only " + responses_.size() + " responses from " + sb.toString() + " ."); + sb.append(message.getFrom()); + } + throw new TimeoutException("Operation timed out - received only " + responses_.size() + " responses from " + sb.toString() + " ."); } } finally { lock_.unlock(); - for(Message response : responses_) + for (Message response : responses_) { - MessagingService.removeRegisteredCallback( response.getMessageId() ); + MessagingService.removeRegisteredCallback(response.getMessageId()); } } - return responseResolver_.resolve( responses_); + return responseResolver_.resolve(responses_); } public void response(Message message) { lock_.lock(); try - { - if ( !done_.get() ) + { + if (!done_.get()) { - responses_.add( message ); - if ( responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_)) - { - done_.set(true); - condition_.signal(); - } + responses_.add(message); + if (responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_)) + { + done_.set(true); + condition_.signal(); + } } } finally Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=820417&r1=820416&r2=820417&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Wed Sep 30 19:39:57 2009 @@ -54,8 +54,8 @@ * repair request should be scheduled. * */ - public Row resolve(List responses) throws DigestMismatchException - { + public Row resolve(List responses) throws DigestMismatchException, IOException + { long startTime = System.currentTimeMillis(); Row retRow = null; List rowList = new ArrayList(); @@ -76,38 +76,31 @@ { byte[] body = response.getMessageBody(); bufIn.reset(body, body.length); - try + long start = System.currentTimeMillis(); + ReadResponse result = ReadResponse.serializer().deserialize(bufIn); + if (logger_.isDebugEnabled()) + logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms."); + if (result.isDigestQuery()) { - long start = System.currentTimeMillis(); - ReadResponse result = ReadResponse.serializer().deserialize(bufIn); - if (logger_.isDebugEnabled()) - logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms."); - if(!result.isDigestQuery()) - { - rowList.add(result.row()); - endPoints.add(response.getFrom()); - key = result.row().key(); - table = result.row().getTable(); - } - else - { - digest = result.digest(); - isDigestQuery = true; - } + digest = result.digest(); + isDigestQuery = true; } - catch( IOException ex ) + else { - logger_.info(LogUtil.throwableToString(ex)); + rowList.add(result.row()); + endPoints.add(response.getFrom()); + key = result.row().key(); + table = result.row().getTable(); } - } + } // If there was a digest query compare it with all the data digests // If there is a mismatch then throw an exception so that read repair can happen. - if(isDigestQuery) - { - for(Row row: rowList) - { - if( !Arrays.equals(row.digest(), digest) ) - { + if (isDigestQuery) + { + for (Row row : rowList) + { + if (!Arrays.equals(row.digest(), digest)) + { /* Wrap the key as the context in this exception */ throw new DigestMismatchException(row.key()); } @@ -115,36 +108,36 @@ } /* If the rowList is empty then we had some exception above. */ - if ( rowList.size() == 0 ) + if (rowList.size() == 0) { return retRow; } - + /* Now calculate the resolved row */ - retRow = new Row(table, key); - for (int i = 0 ; i < rowList.size(); i++) - { - retRow.repair(rowList.get(i)); - } + retRow = new Row(table, key); + for (int i = 0; i < rowList.size(); i++) + { + retRow.repair(rowList.get(i)); + } // At this point we have the return row . - // Now we need to calculate the difference - // so that we can schedule read repairs - for (int i = 0 ; i < rowList.size(); i++) - { - // since retRow is the resolved row it can be used as the super set - Row diffRow = rowList.get(i).diff(retRow); - if(diffRow == null) // no repair needs to happen - continue; - // create the row mutation message based on the diff and schedule a read repair - RowMutation rowMutation = new RowMutation(table, key); - for (ColumnFamily cf : diffRow.getColumnFamilies()) - { - rowMutation.add(cf); - } + // Now we need to calculate the difference + // so that we can schedule read repairs + for (int i = 0; i < rowList.size(); i++) + { + // since retRow is the resolved row it can be used as the super set + Row diffRow = rowList.get(i).diff(retRow); + if (diffRow == null) // no repair needs to happen + continue; + // create the row mutation message based on the diff and schedule a read repair + RowMutation rowMutation = new RowMutation(table, key); + for (ColumnFamily cf : diffRow.getColumnFamilies()) + { + rowMutation.add(cf); + } RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation); - ReadRepairManager.instance().schedule(endPoints.get(i),rowMutationMessage); - } + ReadRepairManager.instance().schedule(endPoints.get(i), rowMutationMessage); + } if (logger_.isDebugEnabled()) logger_.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms."); return retRow; @@ -152,26 +145,26 @@ public boolean isDataPresent(List responses) { - boolean isDataPresent = false; - for (Message response : responses) - { + boolean isDataPresent = false; + for (Message response : responses) + { byte[] body = response.getMessageBody(); - DataInputBuffer bufIn = new DataInputBuffer(); + DataInputBuffer bufIn = new DataInputBuffer(); bufIn.reset(body, body.length); try { - ReadResponse result = ReadResponse.serializer().deserialize(bufIn); - if(!result.isDigestQuery()) - { - isDataPresent = true; - } + ReadResponse result = ReadResponse.serializer().deserialize(bufIn); + if (!result.isDigestQuery()) + { + isDataPresent = true; + } bufIn.close(); } - catch(IOException ex) + catch (IOException ex) { logger_.info(LogUtil.throwableToString(ex)); - } - } - return isDataPresent; - } + } + } + return isDataPresent; + } } Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=820417&r1=820416&r2=820417&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Sep 30 19:39:57 2009 @@ -352,10 +352,7 @@ Message message = command.makeReadMessage(); Message messageDigestOnly = readMessageDigestOnly.makeReadMessage(); - IResponseResolver readResponseResolver = new ReadResponseResolver(); - QuorumResponseHandler quorumResponseHandler = new QuorumResponseHandler( - DatabaseDescriptor.getQuorum(), - readResponseResolver); + QuorumResponseHandler quorumResponseHandler = new QuorumResponseHandler(DatabaseDescriptor.getQuorum(), new ReadResponseResolver()); EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key); List endpointList = new ArrayList(Arrays.asList(StorageService.instance().getReadStorageEndPoints(command.key))); /* Remove the local storage endpoint from the list. */ @@ -401,7 +398,7 @@ } catch (DigestMismatchException ex) { - if ( DatabaseDescriptor.getConsistencyCheck()) + if (DatabaseDescriptor.getConsistencyCheck()) { IResponseResolver readResponseResolverRepair = new ReadResponseResolver(); QuorumResponseHandler quorumResponseHandlerRepair = new QuorumResponseHandler( @@ -409,8 +406,7 @@ readResponseResolverRepair); logger.info("DigestMismatchException: " + command.key); Message messageRepair = command.makeReadMessage(); - MessagingService.getMessagingInstance().sendRR(messageRepair, commandEndPoints.get(commandIndex), - quorumResponseHandlerRepair); + MessagingService.getMessagingInstance().sendRR(messageRepair, commandEndPoints.get(commandIndex), quorumResponseHandlerRepair); try { row = quorumResponseHandlerRepair.get(); Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=820417&r1=820416&r2=820417&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed Sep 30 19:39:57 2009 @@ -398,15 +398,12 @@ return bytes; } - public static String bytesToHex(byte[] buf) + public static String bytesToHex(byte[] bytes) { - char[] chars = new char[2*buf.length]; - for (int i = 0; i < buf.length; i++) - { - chars[i*2] = HEX_CHARS[(buf[i] & 0xF0) >>> 4]; - chars[i*2+1] = HEX_CHARS[buf[i] & 0x0F]; - } - return new String(chars); + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) + sb.append(Integer.toHexString(b & 0xff)); + return sb.toString(); } public static String mapToString(Map map)