Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A8ED3DEE2 for ; Fri, 31 Aug 2012 05:01:49 +0000 (UTC) Received: (qmail 7981 invoked by uid 500); 31 Aug 2012 05:01:49 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 7896 invoked by uid 500); 31 Aug 2012 05:01:48 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 7880 invoked by uid 99); 31 Aug 2012 05:01:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Aug 2012 05:01:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Aug 2012 05:01:45 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 013BC23888FE for ; Fri, 31 Aug 2012 05:01:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1379290 - in /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase: regionserver/HRegion.java regionserver/wal/HLog.java replication/ReplicationZookeeper.java replication/regionserver/ReplicationSource.java Date: Fri, 31 Aug 2012 05:01:00 -0000 To: commits@hbase.apache.org From: jdcryans@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120831050101.013BC23888FE@eris.apache.org> Author: jdcryans Date: Fri Aug 31 05:01:00 2012 New Revision: 1379290 URL: http://svn.apache.org/viewvc?rev=1379290&view=rev Log: HBASE-6321 ReplicationSource dies reading the peer's id HBASE-6647 [performance regression] appendNoSync/HBASE-4528 doesn't take deferred log flush into account Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1379290&r1=1379289&r2=1379290&view=diff ============================================================================== --- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Aug 31 05:01:00 2012 @@ -2212,10 +2212,8 @@ public class HRegion implements HeapSize // ------------------------- // STEP 7. Sync wal. // ------------------------- - if (walEdit.size() > 0 && - (this.regionInfo.isMetaRegion() || - !this.htableDescriptor.isDeferredLogFlush())) { - this.log.sync(txid); + if (walEdit.size() > 0) { + syncOrDefer(txid); } walSyncSuccessful = true; // ------------------------------------------------------------------ @@ -4314,10 +4312,8 @@ public class HRegion implements HeapSize } // 9. sync WAL if required - if (walEdit.size() > 0 && - (this.regionInfo.isMetaRegion() || - !this.htableDescriptor.isDeferredLogFlush())) { - this.log.sync(txid); + if (walEdit.size() > 0) { + syncOrDefer(txid); } walSyncSuccessful = true; @@ -4515,7 +4511,7 @@ public class HRegion implements HeapSize releaseRowLock(lid); } if (writeToWAL) { - this.log.sync(txid); // sync the transaction log outside the rowlock + syncOrDefer(txid); // sync the transaction log outside the rowlock } } finally { closeRegionOperation(); @@ -4642,7 +4638,7 @@ public class HRegion implements HeapSize releaseRowLock(lid); } if (writeToWAL) { - this.log.sync(txid); // sync the transaction log outside the rowlock + syncOrDefer(txid); // sync the transaction log outside the rowlock } } finally { closeRegionOperation(); @@ -4738,7 +4734,7 @@ public class HRegion implements HeapSize releaseRowLock(lid); } if (writeToWAL) { - this.log.sync(txid); // sync the transaction log outside the rowlock + syncOrDefer(txid); // sync the transaction log outside the rowlock } } finally { closeRegionOperation(); @@ -5152,6 +5148,19 @@ public class HRegion implements HeapSize } /** + * Calls sync with the given transaction ID if the region's table is not + * deferring it. + * @param txid should sync up to which transaction + * @throws IOException If anything goes wrong with DFS + */ + private void syncOrDefer(long txid) throws IOException { + if (this.regionInfo.isMetaRegion() || + !this.htableDescriptor.isDeferredLogFlush()) { + this.log.sync(txid); + } + } + + /** * A mocked list implementaion - discards all updates. */ private static final List MOCKED_LIST = new AbstractList() { Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1379290&r1=1379289&r2=1379290&view=diff ============================================================================== --- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original) +++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Aug 31 05:01:00 2012 @@ -1187,10 +1187,14 @@ public class HLog implements Syncable { } /** - * This thread is responsible to call syncFs and buffer up the writers while - * it happens. + * This class is responsible to hold the HLog's appended Entry list + * and to sync them according to a configurable interval. + * + * Deferred log flushing works first by piggy backing on this process by + * simply not sync'ing the appended Entry. It can also be sync'd by other + * non-deferred log flushed entries outside of this thread. */ - class LogSyncer extends HasThread { + class LogSyncer extends HasThread { private final long optionalFlushInterval; @@ -1221,6 +1225,9 @@ public class HLog implements Syncable { closeLogSyncer.wait(this.optionalFlushInterval); } } + // Calling sync since we waited or had unflushed entries. + // Entries appended but not sync'd are taken care of here AKA + // deferred log flush sync(); } catch (IOException e) { LOG.error("Error while syncing, requesting close of hlog ", e); Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1379290&r1=1379289&r2=1379290&view=diff ============================================================================== --- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original) +++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Fri Aug 31 05:01:00 2012 @@ -29,6 +29,7 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -38,7 +39,9 @@ import org.apache.hadoop.hbase.Abortable import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; @@ -238,19 +241,7 @@ public class ReplicationZookeeper { try { addresses = fetchSlavesAddresses(peer.getZkw()); } catch (KeeperException ke) { - if (ke instanceof ConnectionLossException - || ke instanceof SessionExpiredException) { - LOG.warn( - "Lost the ZooKeeper connection for peer " + peer.getClusterKey(), - ke); - try { - peer.reloadZkWatcher(); - } catch(IOException io) { - LOG.warn( - "Creation of ZookeeperWatcher failed for peer " - + peer.getClusterKey(), io); - } - } + reconnectPeer(ke, peer); addresses = Collections.emptyList(); } peer.setRegionServers(addresses); @@ -798,6 +789,50 @@ public class ReplicationZookeeper { return data == null || data.length() == 0 ? 0 : Long.parseLong(data); } + /** + * Returns the UUID of the provided peer id. Should a connection loss or session + * expiration happen, the ZK handler will be reopened once and if it still doesn't + * work then it will bail and return null. + * @param peerId the peer's ID that will be converted into a UUID + * @return a UUID or null if there's a ZK connection issue + */ + public UUID getPeerUUID(String peerId) { + ReplicationPeer peer = getPeerClusters().get(peerId); + UUID peerUUID = null; + try { + peerUUID = getUUIDForCluster(peer.getZkw()); + } catch (KeeperException ke) { + reconnectPeer(ke, peer); + } + return peerUUID; + } + + /** + * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions + * @param zkw watcher connected to an ensemble + * @return the UUID read from zookeeper + * @throws KeeperException + */ + public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException { + return UUID.fromString(ClusterId.readClusterIdZNode(zkw)); + } + + private void reconnectPeer(KeeperException ke, ReplicationPeer peer) { + if (ke instanceof ConnectionLossException + || ke instanceof SessionExpiredException) { + LOG.warn( + "Lost the ZooKeeper connection for peer " + peer.getClusterKey(), + ke); + try { + peer.reloadZkWatcher(); + } catch(IOException io) { + LOG.warn( + "Creation of ZookeeperWatcher failed for peer " + + peer.getClusterKey(), io); + } + } + } + public void registerRegionServerListener(ZooKeeperListener listener) { this.zookeeper.registerListener(listener); } Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1379290&r1=1379289&r2=1379290&view=diff ============================================================================== --- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original) +++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Fri Aug 31 05:01:00 2012 @@ -185,8 +185,7 @@ public class ReplicationSource extends T this.metrics = new ReplicationSourceMetrics(peerClusterZnode); try { - this.clusterId = UUID.fromString(ClusterId.readClusterIdZNode(zkHelper - .getZookeeperWatcher())); + this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher()); } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } @@ -245,13 +244,19 @@ public class ReplicationSource extends T if (!this.isActive()) { return; } + int sleepMultiplier = 1; // delay this until we are in an asynchronous thread - try { - this.peerClusterId = UUID.fromString(ClusterId - .readClusterIdZNode(zkHelper.getPeerClusters().get(peerId).getZkw())); - } catch (KeeperException ke) { - this.terminate("Could not read peer's cluster id", ke); + while (this.peerClusterId == null) { + this.peerClusterId = zkHelper.getPeerUUID(this.peerId); + if (this.peerClusterId == null) { + if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { + sleepMultiplier++; + } + } } + // resetting to 1 to reuse later + sleepMultiplier = 1; + LOG.info("Replicating "+clusterId + " -> " + peerClusterId); // If this is recovered, the queue is already full and the first log @@ -265,7 +270,6 @@ public class ReplicationSource extends T peerClusterZnode, e); } } - int sleepMultiplier = 1; // Loop until we close down while (isActive()) { // Sleep until replication is enabled again