Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2D25410406 for ; Thu, 18 Jul 2013 21:10:06 +0000 (UTC) Received: (qmail 87731 invoked by uid 500); 18 Jul 2013 21:10:06 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 87694 invoked by uid 500); 18 Jul 2013 21:10:06 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 87687 invoked by uid 99); 18 Jul 2013 21:10:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jul 2013 21:10:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jul 2013 21:10:04 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1CE2723889FA; Thu, 18 Jul 2013 21:09:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1504661 - in /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver: ReplicationSource.java ReplicationSourceManager.java Date: Thu, 18 Jul 2013 21:09:44 -0000 To: commits@hbase.apache.org From: larsh@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130718210944.1CE2723889FA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: larsh Date: Thu Jul 18 21:09:43 2013 New Revision: 1504661 URL: http://svn.apache.org/r1504661 Log: HBASE-8599 HLogs in ZK are not cleaned up when replication lag is minimal (Varun Sharma) Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1504661&r1=1504660&r2=1504661&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Jul 18 21:09:43 2013 @@ -282,6 +282,10 @@ public class ReplicationSource extends T sleepMultiplier++; } continue; + } else if (oldPath != null && !oldPath.getName().equals(getCurrentPath().getName())) { + this.manager.cleanOldLogs(getCurrentPath().getName(), + this.peerId, + this.replicationQueueInfo.isQueueRecovered()); } boolean currentWALisBeingWrittenTo = false; //For WAL files we own (rather than recovered), take a snapshot of whether the Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1504661&r1=1504660&r2=1504661&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Thu Jul 18 21:09:43 2013 @@ -155,15 +155,29 @@ public class ReplicationSourceManager { if (holdLogInZK) { return; } + cleanOldLogs(key, id, queueRecovered); + } + + /** + * Cleans a log file and all older files from ZK. Called when we are sure that a + * log file is closed and has no more entries. + * @param key Path to the log + * @param id id of the peer cluster + * @param queueRecovered Whether this is a recovered queue + */ + public void cleanOldLogs(String key, + String id, + boolean queueRecovered) { synchronized (this.hlogsById) { SortedSet hlogs = this.hlogsById.get(id); - if (!queueRecovered && !hlogs.first().equals(key)) { - SortedSet hlogSet = hlogs.headSet(key); - for (String hlog : hlogSet) { - this.zkHelper.removeLogFromList(hlog, id); - } - hlogSet.clear(); + if (queueRecovered || hlogs.first().equals(key)) { + return; + } + SortedSet hlogSet = hlogs.headSet(key); + for (String hlog : hlogSet) { + this.zkHelper.removeLogFromList(hlog, id); } + hlogSet.clear(); } }