Return-Path: Delivered-To: apmail-incubator-cassandra-commits-archive@minotaur.apache.org Received: (qmail 37807 invoked from network); 17 Nov 2009 13:34:18 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 17 Nov 2009 13:34:18 -0000 Received: (qmail 13940 invoked by uid 500); 17 Nov 2009 13:34:18 -0000 Delivered-To: apmail-incubator-cassandra-commits-archive@incubator.apache.org Received: (qmail 13913 invoked by uid 500); 17 Nov 2009 13:34:18 -0000 Mailing-List: contact cassandra-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cassandra-dev@incubator.apache.org Delivered-To: mailing list cassandra-commits@incubator.apache.org Received: (qmail 13903 invoked by uid 99); 17 Nov 2009 13:34:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Nov 2009 13:34:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Nov 2009 13:34:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 23FB223888D4; Tue, 17 Nov 2009 13:33:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r881280 - /incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Date: Tue, 17 Nov 2009 13:33:54 -0000 To: cassandra-commits@incubator.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091117133354.23FB223888D4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jbellis Date: Tue Nov 17 13:33:53 2009 New Revision: 881280 URL: http://svn.apache.org/viewvc?rev=881280&view=rev Log: make local insert() skip MessagingService and fix HH write patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-558 Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=881280&r1=881279&r2=881280&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Nov 17 13:33:53 2009 @@ -19,6 +19,7 @@ import java.io.IOError; import java.io.IOException; +import java.io.IOError; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -71,65 +72,71 @@ } /** - * This method is responsible for creating Message to be - * sent over the wire to N replicas where some of the replicas - * may be hints. - */ - private static Map createWriteMessages(RowMutation rm, Map endpointMap) throws IOException - { - Map messageMap = new HashMap(); - Message message = rm.makeRowMutationMessage(); - - for (Map.Entry entry : endpointMap.entrySet()) - { - InetAddress target = entry.getKey(); - InetAddress hintedTarget = entry.getValue(); - if (target.equals(hintedTarget)) - { - messageMap.put(target, message); - } - else - { - Message hintedMessage = rm.makeRowMutationMessage(); - hintedMessage.addHeader(RowMutation.HINT, hintedTarget.getAddress()); - if (logger.isDebugEnabled()) - logger.debug("Sending the hint of " + hintedTarget + " to " + target); - messageMap.put(hintedTarget, hintedMessage); - } - } - return messageMap; - } - - /** * Use this method to have this RowMutation applied * across all replicas. This method will take care * of the possibility of a replica being down and hint - * the data across to some other replica. + * the data across to some other replica. + * + * This is the ZERO consistency level. We do not wait for replies. + * * @param rm the mutation to be applied across the replicas */ - public static void insert(RowMutation rm) + public static void insert(final RowMutation rm) { - /* - * Get the N nodes from storage service where the data needs to be - * replicated - * Construct a message for write - * Send them asynchronously to the replicas. - */ - long startTime = System.currentTimeMillis(); try { List naturalEndpoints = StorageService.instance().getNaturalEndpoints(rm.key()); - // (This is the ZERO consistency level, so user doesn't care if we don't really have N destinations available.) Map endpointMap = StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints); - Map messageMap = createWriteMessages(rm, endpointMap); - for (Map.Entry entry : messageMap.entrySet()) - { - Message message = entry.getValue(); - InetAddress endpoint = entry.getKey(); - if (logger.isDebugEnabled()) - logger.debug("insert writing key " + rm.key() + " to " + message.getMessageId() + "@" + endpoint); - MessagingService.instance().sendOneWay(message, endpoint); + Message unhintedMessage = null; // lazy initialize for non-local, unhinted writes + + // 3 cases: + // 1. local, unhinted write: run directly on write stage + // 2. non-local, unhinted write: send row mutation message + // 3. hinted write: add hint header, and send message + for (Map.Entry entry : endpointMap.entrySet()) + { + InetAddress target = entry.getKey(); + InetAddress hintedTarget = entry.getValue(); + if (target.equals(hintedTarget)) + { + if (target.equals(FBUtilities.getLocalAddress())) + { + if (logger.isDebugEnabled()) + logger.debug("insert writing local key " + rm.key()); + Runnable runnable = new Runnable() + { + public void run() + { + try + { + rm.apply(); + } + catch (IOException e) + { + throw new IOError(e); + } + } + }; + StageManager.getStage(StageManager.mutationStage_).execute(runnable); + } + else + { + if (unhintedMessage == null) + unhintedMessage = rm.makeRowMutationMessage(); + if (logger.isDebugEnabled()) + logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + target); + MessagingService.instance().sendOneWay(unhintedMessage, target); + } + } + else + { + Message hintedMessage = rm.makeRowMutationMessage(); + hintedMessage.addHeader(RowMutation.HINT, target.getAddress()); + if (logger.isDebugEnabled()) + logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + hintedTarget + " for " + target); + MessagingService.instance().sendOneWay(hintedMessage, hintedTarget); + } } } catch (IOException e)