hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject svn commit: r1379290 - in /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase: regionserver/HRegion.java regionserver/wal/HLog.java replication/ReplicationZookeeper.java replication/regionserver/ReplicationSource.java
Date Fri, 31 Aug 2012 05:01:00 GMT
Author: jdcryans
Date: Fri Aug 31 05:01:00 2012
New Revision: 1379290

URL: http://svn.apache.org/viewvc?rev=1379290&view=rev
Log:
HBASE-6321  ReplicationSource dies reading the peer's id
HBASE-6647  [performance regression] appendNoSync/HBASE-4528 doesn't
            take deferred log flush into account

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1379290&r1=1379289&r2=1379290&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri
Aug 31 05:01:00 2012
@@ -2212,10 +2212,8 @@ public class HRegion implements HeapSize
       // -------------------------
       // STEP 7. Sync wal.
       // -------------------------
-      if (walEdit.size() > 0 &&
-          (this.regionInfo.isMetaRegion() ||
-           !this.htableDescriptor.isDeferredLogFlush())) {
-        this.log.sync(txid);
+      if (walEdit.size() > 0) {
+        syncOrDefer(txid);
       }
       walSyncSuccessful = true;
       // ------------------------------------------------------------------
@@ -4314,10 +4312,8 @@ public class HRegion implements HeapSize
         }
 
         // 9. sync WAL if required
-        if (walEdit.size() > 0 &&
-            (this.regionInfo.isMetaRegion() ||
-             !this.htableDescriptor.isDeferredLogFlush())) {
-          this.log.sync(txid);
+        if (walEdit.size() > 0) {
+          syncOrDefer(txid);
         }
         walSyncSuccessful = true;
 
@@ -4515,7 +4511,7 @@ public class HRegion implements HeapSize
         releaseRowLock(lid);
       }
       if (writeToWAL) {
-        this.log.sync(txid); // sync the transaction log outside the rowlock
+        syncOrDefer(txid); // sync the transaction log outside the rowlock
       }
     } finally {
       closeRegionOperation();
@@ -4642,7 +4638,7 @@ public class HRegion implements HeapSize
         releaseRowLock(lid);
       }
       if (writeToWAL) {
-        this.log.sync(txid); // sync the transaction log outside the rowlock
+        syncOrDefer(txid); // sync the transaction log outside the rowlock
       }
     } finally {
       closeRegionOperation();
@@ -4738,7 +4734,7 @@ public class HRegion implements HeapSize
         releaseRowLock(lid);
       }
       if (writeToWAL) {
-        this.log.sync(txid); // sync the transaction log outside the rowlock
+        syncOrDefer(txid); // sync the transaction log outside the rowlock
       }
     } finally {
       closeRegionOperation();
@@ -5152,6 +5148,19 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * Calls sync with the given transaction ID if the region's table is not
+   * deferring it.
+   * @param txid should sync up to which transaction
+   * @throws IOException If anything goes wrong with DFS
+   */
+  private void syncOrDefer(long txid) throws IOException {
+    if (this.regionInfo.isMetaRegion() ||
+      !this.htableDescriptor.isDeferredLogFlush()) {
+      this.log.sync(txid);
+    }
+  }
+
+  /**
    * A mocked list implementaion - discards all updates.
    */
   private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>()
{

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1379290&r1=1379289&r2=1379290&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri
Aug 31 05:01:00 2012
@@ -1187,10 +1187,14 @@ public class HLog implements Syncable {
   }
 
   /**
-   * This thread is responsible to call syncFs and buffer up the writers while
-   * it happens.
+   * This class is responsible to hold the HLog's appended Entry list
+   * and to sync them according to a configurable interval.
+   *
+   * Deferred log flushing works first by piggy backing on this process by
+   * simply not sync'ing the appended Entry. It can also be sync'd by other
+   * non-deferred log flushed entries outside of this thread.
    */
-   class LogSyncer extends HasThread {
+  class LogSyncer extends HasThread {
 
     private final long optionalFlushInterval;
 
@@ -1221,6 +1225,9 @@ public class HLog implements Syncable {
                 closeLogSyncer.wait(this.optionalFlushInterval);
               }
             }
+            // Calling sync since we waited or had unflushed entries.
+            // Entries appended but not sync'd are taken care of here AKA
+            // deferred log flush
             sync();
           } catch (IOException e) {
             LOG.error("Error while syncing, requesting close of hlog ", e);

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1379290&r1=1379289&r2=1379290&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
Fri Aug 31 05:01:00 2012
@@ -29,6 +29,7 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -38,7 +39,9 @@ import org.apache.hadoop.hbase.Abortable
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
@@ -238,19 +241,7 @@ public class ReplicationZookeeper {
     try {
       addresses = fetchSlavesAddresses(peer.getZkw());
     } catch (KeeperException ke) {
-      if (ke instanceof ConnectionLossException
-          || ke instanceof SessionExpiredException) {
-        LOG.warn(
-            "Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
-            ke);
-        try {
-          peer.reloadZkWatcher();
-        } catch(IOException io) {
-          LOG.warn(
-              "Creation of ZookeeperWatcher failed for peer "
-                  + peer.getClusterKey(), io);
-        }
-      }
+      reconnectPeer(ke, peer);
       addresses = Collections.emptyList();
     }
     peer.setRegionServers(addresses);
@@ -798,6 +789,50 @@ public class ReplicationZookeeper {
     return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
   }
 
+  /**
+   * Returns the UUID of the provided peer id. Should a connection loss or session
+   * expiration happen, the ZK handler will be reopened once and if it still doesn't
+   * work then it will bail and return null.
+   * @param peerId the peer's ID that will be converted into a UUID
+   * @return a UUID or null if there's a ZK connection issue
+   */
+  public UUID getPeerUUID(String peerId) {
+    ReplicationPeer peer = getPeerClusters().get(peerId);
+    UUID peerUUID = null;
+    try {
+      peerUUID = getUUIDForCluster(peer.getZkw());
+    } catch (KeeperException ke) {
+      reconnectPeer(ke, peer);
+    }
+    return peerUUID;
+  }
+
+  /**
+   * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
+   * @param zkw watcher connected to an ensemble
+   * @return the UUID read from zookeeper
+   * @throws KeeperException
+   */
+  public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
+    return UUID.fromString(ClusterId.readClusterIdZNode(zkw));
+  }
+
+  private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
+    if (ke instanceof ConnectionLossException
+      || ke instanceof SessionExpiredException) {
+      LOG.warn(
+        "Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
+        ke);
+      try {
+        peer.reloadZkWatcher();
+      } catch(IOException io) {
+        LOG.warn(
+          "Creation of ZookeeperWatcher failed for peer "
+            + peer.getClusterKey(), io);
+      }
+    }
+  }
+
   public void registerRegionServerListener(ZooKeeperListener listener) {
     this.zookeeper.registerListener(listener);
   }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1379290&r1=1379289&r2=1379290&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Fri Aug 31 05:01:00 2012
@@ -185,8 +185,7 @@ public class ReplicationSource extends T
     this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
 
     try {
-      this.clusterId = UUID.fromString(ClusterId.readClusterIdZNode(zkHelper
-          .getZookeeperWatcher()));
+      this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
     } catch (KeeperException ke) {
       throw new IOException("Could not read cluster id", ke);
     }
@@ -245,13 +244,19 @@ public class ReplicationSource extends T
     if (!this.isActive()) {
       return;
     }
+    int sleepMultiplier = 1;
     // delay this until we are in an asynchronous thread
-    try {
-      this.peerClusterId = UUID.fromString(ClusterId
-          .readClusterIdZNode(zkHelper.getPeerClusters().get(peerId).getZkw()));
-    } catch (KeeperException ke) {
-      this.terminate("Could not read peer's cluster id", ke);
+    while (this.peerClusterId == null) {
+      this.peerClusterId = zkHelper.getPeerUUID(this.peerId);
+      if (this.peerClusterId == null) {
+        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+      }
     }
+    // resetting to 1 to reuse later
+    sleepMultiplier = 1;
+
     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
 
     // If this is recovered, the queue is already full and the first log
@@ -265,7 +270,6 @@ public class ReplicationSource extends T
             peerClusterZnode, e);
       }
     }
-    int sleepMultiplier = 1;
     // Loop until we close down
     while (isActive()) {
       // Sleep until replication is enabled again



Mime
View raw message