Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 616DE9555 for ; Fri, 2 Dec 2011 10:53:20 +0000 (UTC) Received: (qmail 21708 invoked by uid 500); 2 Dec 2011 10:53:20 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 21679 invoked by uid 500); 2 Dec 2011 10:53:20 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 21671 invoked by uid 99); 2 Dec 2011 10:53:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Dec 2011 10:53:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Dec 2011 10:53:18 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id EBD3123888FD for ; Fri, 2 Dec 2011 10:52:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1209399 - in /cassandra/branches/cassandra-1.0: CHANGES.txt src/java/org/apache/cassandra/service/AntiEntropyService.java Date: Fri, 02 Dec 2011 10:52:57 -0000 To: commits@cassandra.apache.org From: slebresne@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111202105257.EBD3123888FD@eris.apache.org> Author: slebresne Date: Fri Dec 2 10:52:57 2011 New Revision: 1209399 URL: http://svn.apache.org/viewvc?rev=1209399&view=rev Log: fix potential race in AES when repair fails patch by slebresne; reviewed by amorton for CASSANDRA-3548 Modified: cassandra/branches/cassandra-1.0/CHANGES.txt cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java Modified: cassandra/branches/cassandra-1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1209399&r1=1209398&r2=1209399&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0/CHANGES.txt Fri Dec 2 10:52:57 2011 @@ -7,6 +7,7 @@ be qualified by keyspace (CASSANDRA-3419) * always remove endpoints from delevery queue in HH (CASSANDRA-3546) * fix race between cf flush and its 2ndary indexes flush (CASSANDRA-3547) + * fix potential race in AES when a repair fails (CASSANDRA-3548) 1.0.5 Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1209399&r1=1209398&r2=1209399&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/AntiEntropyService.java Fri Dec 2 10:52:57 2011 @@ -180,14 +180,15 @@ public class AntiEntropyService return; } - if (session.terminated()) + RepairSession.RepairJob job = session.jobs.peek(); + if (job == null) + { + assert session.terminated(); return; + } logger.info(String.format("[repair #%s] Received merkle tree for %s from %s", session.getName(), request.cf.right, request.endpoint)); - RepairSession.RepairJob job = session.jobs.peek(); - assert job != null : "A repair should have at least some jobs scheduled"; - if (job.addTree(request, tree) == 0) { logger.debug("All trees received for " + session.getName() + "/" + request.cf.right); @@ -704,14 +705,14 @@ public class AntiEntropyService } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for repair: repair will continue in the background."); + throw new RuntimeException("Interrupted while waiting for repair."); } finally { + // mark this session as terminated + terminate(); FailureDetector.instance.unregisterFailureDetectionEventListener(this); Gossiper.instance.unregister(this); - // mark this session as terminated - terminated = true; AntiEntropyService.instance.sessions.remove(getName()); } } @@ -724,28 +725,36 @@ public class AntiEntropyService return terminated; } + public void terminate() + { + terminated = true; + jobs.clear(); + activeJobs.clear(); + } + /** * clear all RepairJobs and terminate this session. */ public void forceShutdown() { - jobs.clear(); - activeJobs.clear(); differencingDone.signalAll(); completed.signalAll(); } void completed(Differencer differencer) { - if (terminated) - return; - logger.debug(String.format("[repair #%s] Repair completed between %s and %s on %s", getName(), differencer.r1.endpoint, differencer.r2.endpoint, differencer.cfname)); RepairJob job = activeJobs.get(differencer.cfname); + if (job == null) + { + assert terminated; + return; + } + if (job.completedSynchronization(differencer)) { activeJobs.remove(differencer.cfname);