Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 65739 invoked from network); 2 Mar 2010 21:22:47 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 2 Mar 2010 21:22:47 -0000 Received: (qmail 25268 invoked by uid 500); 2 Mar 2010 21:22:42 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 25232 invoked by uid 500); 2 Mar 2010 21:22:42 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Received: (qmail 25224 invoked by uid 99); 2 Mar 2010 21:22:42 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Mar 2010 21:22:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Mar 2010 21:22:41 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E4B7A23889BF; Tue, 2 Mar 2010 21:22:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r918186 - in /incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra: db/RowMutationVerbHandler.java net/Header.java net/Message.java service/StorageProxy.java streaming/StreamOut.java Date: Tue, 02 Mar 2010 21:22:21 -0000 To: cassandra-commits@incubator.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100302212221.E4B7A23889BF@eris.apache.org> Author: jbellis Date: Tue Mar 2 21:22:21 2010 New Revision: 918186 URL: http://svn.apache.org/viewvc?rev=918186&view=rev Log: clean up hints/headers, add ability to hint multiple targets per message. patch by jbellis; reviewed by Ryan King for CASSANDRA-822 Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=918186&r1=918185&r2=918186&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original) +++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Tue Mar 2 21:22:21 2010 @@ -21,12 +21,15 @@ import java.io.*; import java.net.InetAddress; +import java.nio.ByteBuffer; + import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.log4j.Logger; import org.apache.cassandra.net.*; +import org.apache.cassandra.utils.FBUtilities; public class RowMutationVerbHandler implements IVerbHandler { @@ -45,15 +48,21 @@ /* Check if there were any hints in this message */ byte[] hintedBytes = message.getHeader(RowMutation.HINT); - if ( hintedBytes != null && hintedBytes.length > 0 ) + if (hintedBytes != null) { - InetAddress hint = InetAddress.getByAddress(hintedBytes); - if (logger_.isDebugEnabled()) - logger_.debug("Adding hint for " + hint); - /* add necessary hints to this mutation */ - RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.getTable()); - hintedMutation.addHints(rm.key(), hintedBytes); - hintedMutation.apply(); + assert hintedBytes.length > 0; + ByteBuffer bb = ByteBuffer.wrap(hintedBytes); + byte[] addressBytes = new byte[FBUtilities.getLocalAddress().getAddress().length]; + while (bb.remaining() > 0) + { + bb.get(addressBytes); + InetAddress hint = InetAddress.getByAddress(addressBytes); + if (logger_.isDebugEnabled()) + logger_.debug("Adding hint for " + hint); + RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.getTable()); + hintedMutation.addHints(rm.key(), addressBytes); + hintedMutation.apply(); + } } Table.open(rm.getTable()).apply(rm, bytes, true); Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java?rev=918186&r1=918185&r2=918186&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java (original) +++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java Tue Mar 2 21:22:21 2010 @@ -99,35 +99,15 @@ messageId_ = id; } - void setMessageType(String type) - { - type_ = type; - } - - void setMessageVerb(StorageService.Verb verb) - { - verb_ = verb; - } - byte[] getDetail(Object key) { return details_.get(key); } - - void removeDetail(Object key) - { - details_.remove(key); - } - - void addDetail(String key, byte[] value) + + void setDetail(String key, byte[] value) { details_.put(key, value); } - - Map getDetails() - { - return details_; - } } class HeaderSerializer implements ICompactSerializer
Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java?rev=918186&r1=918185&r2=918186&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java (original) +++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java Tue Mar 2 21:22:21 2010 @@ -21,7 +21,6 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.Map; import java.net.InetAddress; import org.apache.cassandra.concurrent.StageManager; @@ -64,14 +63,9 @@ return header_.getDetail(key); } - public void addHeader(String key, byte[] value) + public void setHeader(String key, byte[] value) { - header_.addDetail(key, value); - } - - public Map getHeaders() - { - return header_.getDetails(); + header_.setDetail(key, value); } public byte[] getMessageBody() Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=918186&r1=918185&r2=918186&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Tue Mar 2 21:22:21 2010 @@ -27,11 +27,10 @@ import java.util.concurrent.Future; import java.lang.management.ManagementFactory; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import java.net.InetAddress; @@ -145,7 +144,7 @@ else { Message hintedMessage = rm.makeRowMutationMessage(); - hintedMessage.addHeader(RowMutation.HINT, target.getAddress()); + addHintHeader(hintedMessage, target); if (logger.isDebugEnabled()) logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId() + "@" + hintedTarget + " for " + target); MessagingService.instance.sendOneWay(hintedMessage, hintedTarget); @@ -163,7 +162,14 @@ writeStats.addNano(System.nanoTime() - startTime); } } - + + private static void addHintHeader(Message message, InetAddress target) + { + byte[] oldHint = message.getHeader(RowMutation.HINT); + byte[] hint = oldHint == null ? target.getAddress() : ArrayUtils.addAll(oldHint, target.getAddress()); + message.setHeader(RowMutation.HINT, hint); + } + public static void mutateBlocking(List mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException { long startTime = System.nanoTime(); @@ -214,7 +220,7 @@ else { Message hintedMessage = rm.makeRowMutationMessage(); - hintedMessage.addHeader(RowMutation.HINT, naturalTarget.getAddress()); + addHintHeader(hintedMessage, naturalTarget); // (hints are part of the callback and count towards consistency only under CL.ANY if (consistency_level == ConsistencyLevel.ANY) MessagingService.instance.addCallback(responseHandler, hintedMessage.getMessageId()); @@ -343,7 +349,7 @@ if (logger.isDebugEnabled()) logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint); - message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes()); + message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes()); iars.add(MessagingService.instance.sendRR(message, endPoint)); } Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=918186&r1=918185&r2=918186&view=diff ============================================================================== --- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java (original) +++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java Tue Mar 2 21:22:21 2010 @@ -130,7 +130,7 @@ StreamOutManager.get(target).addFilesToStream(pendingFiles); StreamInitiateMessage biMessage = new StreamInitiateMessage(pendingFiles); Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage); - message.addHeader(StreamOut.TABLE_NAME, table.getBytes()); + message.setHeader(StreamOut.TABLE_NAME, table.getBytes()); if (logger.isDebugEnabled()) logger.debug("Sending a stream initiate message to " + target + " ..."); MessagingService.instance.sendOneWay(message, target);