hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1425525 [4/7] - in /hbase/branches/0.94-test: ./ bin/ conf/ security/src/main/java/org/apache/hadoop/hbase/ipc/ security/src/main/java/org/apache/hadoop/hbase/security/access/ security/src/test/java/org/apache/hadoop/hbase/security/access/...
Date Sun, 23 Dec 2012 20:54:15 GMT
Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Sun Dec 23 20:54:12 2012
@@ -105,8 +105,6 @@ public class ReplicationSource extends T
   private int replicationQueueNbCapacity;
   // Our reader for the current log
   private HLog.Reader reader;
-  // Current position in the log
-  private long position = 0;
   // Last position in the log that we sent to ZooKeeper
   private long lastLoggedPosition = -1;
   // Path of the current log
@@ -132,10 +130,15 @@ public class ReplicationSource extends T
   private int currentNbEntries = 0;
   // Current number of operations (Put/Delete) that we need to replicate
   private int currentNbOperations = 0;
+  // Current size of data we need to replicate
+  private int currentSize = 0;
   // Indicates if this particular source is running
   private volatile boolean running = true;
   // Metrics for this source
   private ReplicationSourceMetrics metrics;
+  // Handle on the log reader helper
+  private ReplicationHLogReaderManager repLogReader;
+
 
   /**
    * Instantiation method used by region servers
@@ -183,7 +186,7 @@ public class ReplicationSource extends T
         this.conf.getLong("replication.source.sleepforretries", 1000);
     this.fs = fs;
     this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
-
+    this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
     try {
       this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
     } catch (KeeperException ke) {
@@ -263,8 +266,8 @@ public class ReplicationSource extends T
     // normally has a position (unless the RS failed between 2 logs)
     if (this.queueRecovered) {
       try {
-        this.position = this.zkHelper.getHLogRepPosition(
-            this.peerClusterZnode, this.queue.peek().getName());
+        this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
+            this.peerClusterZnode, this.queue.peek().getName()));
       } catch (KeeperException e) {
         this.terminate("Couldn't get the position of this recovered queue " +
             peerClusterZnode, e);
@@ -322,6 +325,7 @@ public class ReplicationSource extends T
 
       boolean gotIOE = false;
       currentNbEntries = 0;
+      currentSize = 0;
       try {
         if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
           continue;
@@ -357,9 +361,7 @@ public class ReplicationSource extends T
         }
       } finally {
         try {
-          if (this.reader != null) {
-            this.reader.close();
-          }
+          this.repLogReader.closeReader();
         } catch (IOException e) {
           gotIOE = true;
           LOG.warn("Unable to finalize the tailing of a file", e);
@@ -370,10 +372,10 @@ public class ReplicationSource extends T
       // wait a bit and retry.
       // But if we need to stop, don't bother sleeping
       if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
-        if (this.lastLoggedPosition != this.position) {
+        if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
-              this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
-          this.lastLoggedPosition = this.position;
+              this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
+          this.lastLoggedPosition = this.repLogReader.getPosition();
         }
         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
           sleepMultiplier++;
@@ -405,11 +407,9 @@ public class ReplicationSource extends T
   protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
       throws IOException{
     long seenEntries = 0;
-    if (this.position != 0) {
-      this.reader.seek(this.position);
-    }
-    long startPosition = this.position;
-    HLog.Entry entry = readNextAndSetPosition();
+    this.repLogReader.seek();
+    HLog.Entry entry =
+        this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
     while (entry != null) {
       WALEdit edit = entry.getEdit();
       this.metrics.logEditsReadRate.inc(1);
@@ -433,18 +433,18 @@ public class ReplicationSource extends T
           }
           currentNbOperations += countDistinctRowKeys(edit);
           currentNbEntries++;
+          currentSize += entry.getEdit().heapSize();
         } else {
           this.metrics.logEditsFilteredRate.inc(1);
         }
       }
       // Stop if too many entries or too big
-      if ((this.reader.getPosition() - startPosition)
-          >= this.replicationQueueSizeCapacity ||
+      if (currentSize >= this.replicationQueueSizeCapacity ||
           currentNbEntries >= this.replicationQueueNbCapacity) {
         break;
       }
       try {
-        entry = readNextAndSetPosition();
+        entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
       } catch (IOException ie) {
         LOG.debug("Break on IOE: " + ie.getMessage());
         break;
@@ -452,7 +452,7 @@ public class ReplicationSource extends T
     }
     LOG.debug("currentNbOperations:" + currentNbOperations +
         " and seenEntries:" + seenEntries +
-        " and size: " + (this.reader.getPosition() - startPosition));
+        " and size: " + this.currentSize);
     if (currentWALisBeingWrittenTo) {
       return false;
     }
@@ -461,16 +461,6 @@ public class ReplicationSource extends T
     return seenEntries == 0 && processEndOfFile();
   }
 
-  private HLog.Entry readNextAndSetPosition() throws IOException {
-    HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
-    // Store the position so that in the future the reader can start
-    // reading from here. If the above call to next() throws an
-    // exception, the position won't be changed and retry will happen
-    // from the last known good position
-    this.position = this.reader.getPosition();
-    return entry;
-  } 
-
   private void connectToPeers() {
     // Connect to peer cluster first, unless we have to stop
     while (this.isActive() && this.currentPeers.size() == 0) {
@@ -509,10 +499,9 @@ public class ReplicationSource extends T
   protected boolean openReader(int sleepMultiplier) {
     try {
       LOG.debug("Opening log for replication " + this.currentPath.getName() +
-          " at " + this.position);
+          " at " + this.repLogReader.getPosition());
       try {
-       this.reader = null;
-       this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
+        this.reader = repLogReader.openReader(this.currentPath);
       } catch (FileNotFoundException fnfe) {
         if (this.queueRecovered) {
           // We didn't find the log in the archive directory, look if it still
@@ -648,10 +637,10 @@ public class ReplicationSource extends T
         HRegionInterface rrs = getRS();
         LOG.debug("Replicating " + currentNbEntries);
         rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
-        if (this.lastLoggedPosition != this.position) {
+        if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
-              this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
-          this.lastLoggedPosition = this.position;
+              this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo);
+          this.lastLoggedPosition = this.repLogReader.getPosition();
         }
         this.totalReplicatedEdits += currentNbEntries;
         this.metrics.shippedBatchesRate.inc(1);
@@ -721,7 +710,8 @@ public class ReplicationSource extends T
   protected boolean processEndOfFile() {
     if (this.queue.size() != 0) {
       this.currentPath = null;
-      this.position = 0;
+      this.repLogReader.finishCurrentFile();
+      this.reader = null;
       return true;
     } else if (this.queueRecovered) {
       this.manager.closeRecoveredQueue(this);

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Sun Dec 23 20:54:12 2012
@@ -72,7 +72,6 @@ public class RemoteHTable implements HTa
   final Client client;
   final Configuration conf;
   final byte[] name;
-  final String accessToken;
   final int maxRetries;
   final long sleepTime;
 
@@ -81,10 +80,6 @@ public class RemoteHTable implements HTa
       final long startTime, final long endTime, final int maxVersions) {
     StringBuffer sb = new StringBuffer();
     sb.append('/');
-    if (accessToken != null) {
-      sb.append(accessToken);
-      sb.append('/');
-    }
     sb.append(Bytes.toStringBinary(name));
     sb.append('/');
     sb.append(Bytes.toStringBinary(row));
@@ -142,6 +137,29 @@ public class RemoteHTable implements HTa
     return sb.toString();
   }
 
+  protected String buildMultiRowSpec(final byte[][] rows, int maxVersions) {
+    StringBuilder sb = new StringBuilder();
+    sb.append('/');
+    sb.append(Bytes.toStringBinary(name));
+    sb.append("/multiget/");
+    if (rows == null || rows.length == 0) {
+      return sb.toString();
+    }
+    sb.append("?");
+    for(int i=0; i<rows.length; i++) {
+      byte[] rk = rows[i];
+      if (i != 0) {
+        sb.append('&');
+      }
+      sb.append("row=");
+      sb.append(Bytes.toStringBinary(rk));
+    }
+    sb.append("&v=");
+    sb.append(maxVersions);
+
+    return sb.toString();
+  }
+
   protected Result[] buildResultFromModel(final CellSetModel model) {
     List<Result> results = new ArrayList<Result>();
     for (RowModel row: model.getRows()) {
@@ -187,7 +205,9 @@ public class RemoteHTable implements HTa
    * @param client
    * @param name
    * @param accessToken
+   * @deprecated accessToken is not used and will be removed
    */
+  @Deprecated
   public RemoteHTable(Client client, String name, String accessToken) {
     this(client, HBaseConfiguration.create(), Bytes.toBytes(name), accessToken);
   }
@@ -197,8 +217,20 @@ public class RemoteHTable implements HTa
    * @param client
    * @param conf
    * @param name
+   */
+  public RemoteHTable(Client client, Configuration conf, String name) {
+    this(client, conf, Bytes.toBytes(name), null);
+  }
+
+  /**
+   * Constructor
+   * @param client
+   * @param conf
+   * @param name
    * @param accessToken
+   * @deprecated accessToken is not used and will be removed
    */
+  @Deprecated
   public RemoteHTable(Client client, Configuration conf, String name,
       String accessToken) {
     this(client, conf, Bytes.toBytes(name), accessToken);
@@ -206,14 +238,28 @@ public class RemoteHTable implements HTa
 
   /**
    * Constructor
+   * @param client
+   * @param conf
+   * @param name
+   */
+  public RemoteHTable(Client client, Configuration conf, byte[] name) {
+    this(client, conf, name, null);
+  }
+
+  /**
+   * Constructor
+   * @param client
    * @param conf
+   * @param name
+   * @param accessToken
+   * @deprecated accessToken is not used and will be removed
    */
+  @Deprecated
   public RemoteHTable(Client client, Configuration conf, byte[] name,
       String accessToken) {
     this.client = client;
     this.conf = conf;
     this.name = name;
-    this.accessToken = accessToken;
     this.maxRetries = conf.getInt("hbase.rest.client.max.retries", 10);
     this.sleepTime = conf.getLong("hbase.rest.client.sleep", 1000);
   }
@@ -229,10 +275,6 @@ public class RemoteHTable implements HTa
   public HTableDescriptor getTableDescriptor() throws IOException {
     StringBuilder sb = new StringBuilder();
     sb.append('/');
-    if (accessToken != null) {
-      sb.append(accessToken);
-      sb.append('/');
-    }
     sb.append(Bytes.toStringBinary(name));
     sb.append('/');
     sb.append("schema");
@@ -267,30 +309,68 @@ public class RemoteHTable implements HTa
     if (get.getFilter() != null) {
       LOG.warn("filters not supported on gets");
     }
+    Result[] results = getResults(spec);
+    if (results.length > 0) {
+      if (results.length > 1) {
+        LOG.warn("too many results for get (" + results.length + ")");
+      }
+      return results[0];
+    } else {
+      return new Result();
+    }
+  }
+
+  public Result[] get(List<Get> gets) throws IOException {
+    byte[][] rows = new byte[gets.size()][];
+    int maxVersions = 1;
+    int count = 0;
+
+    for (Get g : gets) {
+
+      if (count == 0) {
+        maxVersions = g.getMaxVersions();
+      } else if (g.getMaxVersions() != maxVersions) {
+        LOG.warn("MaxVersions on Gets do not match, using the first in the list ("
+            + maxVersions +")");
+      }
+
+      if (g.getFilter() != null) {
+        LOG.warn("filters not supported on gets");
+      }
+
+      rows[count] = g.getRow();
+      count++;
+    }
+
+    String spec = buildMultiRowSpec(rows, maxVersions);
+
+    return getResults(spec);
+  }
+
+  private Result[] getResults(String spec) throws IOException {
     for (int i = 0; i < maxRetries; i++) {
       Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF);
       int code = response.getCode();
       switch (code) {
-      case 200:
-        CellSetModel model = new CellSetModel();
-        model.getObjectFromMessage(response.getBody());
-        Result[] results = buildResultFromModel(model);
-        if (results.length > 0) {
-          if (results.length > 1) {
-            LOG.warn("too many results for get (" + results.length + ")");
+        case 200:
+          CellSetModel model = new CellSetModel();
+          model.getObjectFromMessage(response.getBody());
+          Result[] results = buildResultFromModel(model);
+          if (results.length > 0) {
+            return results;
           }
-          return results[0];
-        }
-        // fall through
-      case 404:
-        return new Result();
-      case 509:
-        try {
-          Thread.sleep(sleepTime);
-        } catch (InterruptedException e) { }
-        break;
-      default:
-        throw new IOException("get request returned " + code);
+          // fall through
+        case 404:
+          return new Result[0];
+
+        case 509:
+          try {
+            Thread.sleep(sleepTime);
+          } catch (InterruptedException e) {
+          }
+          break;
+        default:
+          throw new IOException("get request returned " + code);
       }
     }
     throw new IOException("get request timed out");
@@ -306,10 +386,6 @@ public class RemoteHTable implements HTa
     CellSetModel model = buildModelFromPut(put);
     StringBuilder sb = new StringBuilder();
     sb.append('/');
-    if (accessToken != null) {
-      sb.append(accessToken);
-      sb.append('/');      
-    }
     sb.append(Bytes.toStringBinary(name));
     sb.append('/');
     sb.append(Bytes.toStringBinary(put.getRow()));
@@ -364,10 +440,6 @@ public class RemoteHTable implements HTa
     // build path for multiput
     StringBuilder sb = new StringBuilder();
     sb.append('/');
-    if (accessToken != null) {
-      sb.append(accessToken);
-      sb.append('/');      
-    }
     sb.append(Bytes.toStringBinary(name));
     sb.append("/$multiput"); // can be any nonexistent row
     for (int i = 0; i < maxRetries; i++) {
@@ -433,10 +505,6 @@ public class RemoteHTable implements HTa
       }
       StringBuffer sb = new StringBuffer();
       sb.append('/');
-      if (accessToken != null) {
-        sb.append(accessToken);
-        sb.append('/');
-      }
       sb.append(Bytes.toStringBinary(name));
       sb.append('/');
       sb.append("scanner");
@@ -575,10 +643,16 @@ public class RemoteHTable implements HTa
     throw new IOException("getRowOrBefore not supported");
   }
 
+  /**
+   * @deprecated {@link RowLock} and associated operations are deprecated
+   */
   public RowLock lockRow(byte[] row) throws IOException {
     throw new IOException("lockRow not implemented");
   }
 
+  /**
+   * @deprecated {@link RowLock} and associated operations are deprecated
+   */
   public void unlockRow(RowLock rl) throws IOException {
     throw new IOException("unlockRow not implemented");
   }
@@ -591,10 +665,6 @@ public class RemoteHTable implements HTa
     CellSetModel model = buildModelFromPut(put);
     StringBuilder sb = new StringBuilder();
     sb.append('/');
-    if (accessToken != null) {
-      sb.append(accessToken);
-      sb.append('/');
-    }
     sb.append(Bytes.toStringBinary(name));
     sb.append('/');
     sb.append(Bytes.toStringBinary(put.getRow()));
@@ -630,10 +700,6 @@ public class RemoteHTable implements HTa
     CellSetModel model = buildModelFromPut(put);
     StringBuilder sb = new StringBuilder();
     sb.append('/');
-    if (accessToken != null) {
-      sb.append(accessToken);
-      sb.append('/');
-    }
     sb.append(Bytes.toStringBinary(name));
     sb.append('/');
     sb.append(Bytes.toStringBinary(row));
@@ -690,11 +756,6 @@ public class RemoteHTable implements HTa
   }
 
   @Override
-  public Result[] get(List<Get> gets) throws IOException {
-    throw new IOException("get(List<Get>) not supported");
-  }
-
-  @Override
   public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol,
       byte[] row) {
     throw new

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/security/User.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/security/User.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/security/User.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/security/User.java Sun Dec 23 20:54:12 2012
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.security
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.util.Methods;
 import org.apache.hadoop.mapred.JobConf;
@@ -216,12 +217,15 @@ public abstract class User {
   }
 
   /**
-   * Returns whether or not secure authentication is enabled for HBase
-   * (whether <code>hbase.security.authentication</code> is set to
-   * <code>kerberos</code>.
+   * Returns whether or not secure authentication is enabled for HBase.  Note that
+   * HBase security requires HDFS security to provide any guarantees, so this requires that
+   * both <code>hbase.security.authentication</code> and <code>hadoop.security.authentication</code>
+   * are set to <code>kerberos</code>.
    */
   public static boolean isHBaseSecurityEnabled(Configuration conf) {
-    return "kerberos".equalsIgnoreCase(conf.get(HBASE_SECURITY_CONF_KEY));
+    return "kerberos".equalsIgnoreCase(conf.get(HBASE_SECURITY_CONF_KEY)) &&
+        "kerberos".equalsIgnoreCase(
+            conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
   }
 
   /* Concrete implementations */

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java Sun Dec 23 20:54:12 2012
@@ -63,7 +63,7 @@ public abstract class AbstractHBaseTool 
   protected abstract void processOptions(CommandLine cmd);
 
   /** The "main function" of the tool */
-  protected abstract void doWork() throws Exception;
+  protected abstract int doWork() throws Exception;
 
   @Override
   public Configuration getConf() {
@@ -99,13 +99,14 @@ public abstract class AbstractHBaseTool 
 
     processOptions(cmd);
 
+    int ret = EXIT_FAILURE;
     try {
-      doWork();
+      ret = doWork();
     } catch (Exception e) {
       LOG.error("Error running command-line tool", e);
       return EXIT_FAILURE;
     }
-    return EXIT_SUCCESS;
+    return ret;
   }
 
   private boolean sanityCheckOptions(CommandLine cmd) {

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/ChecksumType.java Sun Dec 23 20:54:12 2012
@@ -25,9 +25,6 @@ import java.util.zip.Checksum;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ChecksumFactory;
-
 /**
  * Checksum types. The Checksum type is a one byte number
  * that stores a representation of the checksum algorithm
@@ -70,7 +67,7 @@ public enum ChecksumType {
         ctor = ChecksumFactory.newConstructor(PURECRC32);
         LOG.info("Checksum using " + PURECRC32);
       } catch (Exception e) {
-        LOG.info(PURECRC32 + " not available.");
+        LOG.trace(PURECRC32 + " not available.");
       }
       try {
         // The default checksum class name is java.util.zip.CRC32. 
@@ -80,7 +77,7 @@ public enum ChecksumType {
           LOG.info("Checksum can use " + JDKCRC);
         }
       } catch (Exception e) {
-        LOG.warn(JDKCRC + " not available. ",  e);
+        LOG.trace(JDKCRC + " not available.");
       }
     }
 
@@ -113,7 +110,7 @@ public enum ChecksumType {
         ctor = ChecksumFactory.newConstructor(PURECRC32C);
         LOG.info("Checksum can use " + PURECRC32C);
       } catch (Exception e) {
-        LOG.info(PURECRC32C + " not available. ");
+        LOG.trace(PURECRC32C + " not available.");
       }
     }
 

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Sun Dec 23 20:54:12 2012
@@ -151,7 +151,7 @@ public abstract class FSUtils {
    */
   public static FSDataOutputStream create(FileSystem fs, Path path,
       FsPermission perm, boolean overwrite) throws IOException {
-    LOG.debug("Creating file:" + path + "with permission:" + perm);
+    LOG.debug("Creating file=" + path + " with permission=" + perm);
 
     return fs.create(path, perm, overwrite,
         fs.getConf().getInt("io.file.buffer.size", 4096),
@@ -1013,6 +1013,25 @@ public abstract class FSUtils {
   }
 
   /**
+   * Given a particular region dir, return all the familydirs inside it
+   *
+   * @param fs A file system for the Path
+   * @param regionDir Path to a specific region directory
+   * @return List of paths to valid family directories in region dir.
+   * @throws IOException
+   */
+  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
+    // assumes we are in a region dir.
+    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
+    List<Path> familyDirs = new ArrayList<Path>(fds.length);
+    for (FileStatus fdfs: fds) {
+      Path fdPath = fdfs.getPath();
+      familyDirs.add(fdPath);
+    }
+    return familyDirs;
+  }
+
+  /**
    * Filter for HFiles that excludes reference files.
    */
   public static class HFileFilter implements PathFilter {

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Sun Dec 23 20:54:12 2012
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
@@ -93,6 +95,9 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.KeeperException;
 
 import com.google.common.base.Joiner;
@@ -146,7 +151,7 @@ import com.google.common.collect.TreeMul
  * can be used to limit the kinds of repairs hbck will do.  See the code in
  * {@link #printUsageAndExit()} for more details.
  */
-public class HBaseFsck {
+public class HBaseFsck extends Configured implements Tool {
   public static final long DEFAULT_TIME_LAG = 60000; // default value of 1 minute
   public static final long DEFAULT_SLEEP_BEFORE_RERUN = 10000;
   private static final int MAX_NUM_THREADS = 50; // #threads to contact regions
@@ -159,7 +164,6 @@ public class HBaseFsck {
    * Internal resources
    **********************/
   private static final Log LOG = LogFactory.getLog(HBaseFsck.class.getName());
-  private Configuration conf;
   private ClusterStatus status;
   private HConnection connection;
   private HBaseAdmin admin;
@@ -176,12 +180,14 @@ public class HBaseFsck {
   private long timelag = DEFAULT_TIME_LAG; // tables whose modtime is older
   private boolean fixAssignments = false; // fix assignment errors?
   private boolean fixMeta = false; // fix meta errors?
+  private boolean checkHdfs = true; // load and check fs consistency?
   private boolean fixHdfsHoles = false; // fix fs holes?
   private boolean fixHdfsOverlaps = false; // fix fs overlaps (risky)
   private boolean fixHdfsOrphans = false; // fix fs holes (missing .regioninfo)
   private boolean fixTableOrphans = false; // fix fs holes (missing .tableinfo)
   private boolean fixVersionFile = false; // fix missing hbase.version file in hdfs
   private boolean fixSplitParents = false; // fix lingering split parents
+  private boolean fixReferenceFiles = false; // fix lingering reference store file
 
   // limit checking/fixes to listed tables, if empty attempt to check/fix all
   // -ROOT- and .META. are always checked
@@ -199,7 +205,7 @@ public class HBaseFsck {
   /*********
    * State
    *********/
-  private ErrorReporter errors = new PrintingErrorReporter();
+  final private ErrorReporter errors;
   int fixes = 0;
 
   /**
@@ -240,8 +246,9 @@ public class HBaseFsck {
    * @throws ZooKeeperConnectionException if unable to connect to ZooKeeper
    */
   public HBaseFsck(Configuration conf) throws MasterNotRunningException,
-      ZooKeeperConnectionException, IOException {
-    this.conf = conf;
+      ZooKeeperConnectionException, IOException, ClassNotFoundException {
+    super(conf);
+    errors = getErrorReporter(conf);
 
     int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
     executor = new ScheduledThreadPoolExecutor(numThreads);
@@ -258,8 +265,9 @@ public class HBaseFsck {
    *           if unable to connect to ZooKeeper
    */
   public HBaseFsck(Configuration conf, ExecutorService exec) throws MasterNotRunningException,
-      ZooKeeperConnectionException, IOException {
-    this.conf = conf;
+      ZooKeeperConnectionException, IOException, ClassNotFoundException {
+    super(conf);
+    errors = getErrorReporter(getConf());
     this.executor = exec;
   }
 
@@ -268,8 +276,8 @@ public class HBaseFsck {
    * online state.
    */
   public void connect() throws IOException {
-    admin = new HBaseAdmin(conf);
-    meta = new HTable(conf, HConstants.META_TABLE_NAME);
+    admin = new HBaseAdmin(getConf());
+    meta = new HTable(getConf(), HConstants.META_TABLE_NAME);
     status = admin.getMaster().getClusterStatus();
     connection = admin.getConnection();
   }
@@ -333,11 +341,11 @@ public class HBaseFsck {
    */
   public void offlineHdfsIntegrityRepair() throws IOException, InterruptedException {
     // Initial pass to fix orphans.
-    if (shouldFixHdfsOrphans() || shouldFixHdfsHoles()
-        || shouldFixHdfsOverlaps() || shouldFixTableOrphans()) {
+    if (shouldCheckHdfs() && (shouldFixHdfsOrphans() || shouldFixHdfsHoles()
+        || shouldFixHdfsOverlaps() || shouldFixTableOrphans())) {
       LOG.info("Loading regioninfos HDFS");
       // if nothing is happening this should always complete in two iterations.
-      int maxIterations = conf.getInt("hbase.hbck.integrityrepair.iterations.max", 3);
+      int maxIterations = getConf().getInt("hbase.hbck.integrityrepair.iterations.max", 3);
       int curIter = 0;
       do {
         clearState(); // clears hbck state and reset fixes to 0 and.
@@ -391,8 +399,10 @@ public class HBaseFsck {
     loadDeployedRegions();
 
     // load regiondirs and regioninfos from HDFS
-    loadHdfsRegionDirs();
-    loadHdfsRegionInfos();
+    if (shouldCheckHdfs()) {
+      loadHdfsRegionDirs();
+      loadHdfsRegionInfos();
+    }
 
     // Empty cells in .META.?
     reportEmptyMetaCells();
@@ -429,6 +439,8 @@ public class HBaseFsck {
       admin.setBalancerRunning(oldBalancer, false);
     }
 
+    offlineReferenceFileRepair();
+
     // Print table summary
     printTableSummary(tablesInfo);
     return errors.summarize();
@@ -455,7 +467,7 @@ public class HBaseFsck {
    */
   private void adoptHdfsOrphan(HbckInfo hi) throws IOException {
     Path p = hi.getHdfsRegionDir();
-    FileSystem fs = p.getFileSystem(conf);
+    FileSystem fs = p.getFileSystem(getConf());
     FileStatus[] dirs = fs.listStatus(p);
     if (dirs == null) {
       LOG.warn("Attempt to adopt ophan hdfs region skipped becuase no files present in " +
@@ -480,7 +492,7 @@ public class HBaseFsck {
         byte[] start, end;
         HFile.Reader hf = null;
         try {
-          CacheConfig cacheConf = new CacheConfig(conf);
+          CacheConfig cacheConf = new CacheConfig(getConf());
           hf = HFile.createReader(fs, hfile.getPath(), cacheConf);
           hf.loadFileInfo();
           KeyValue startKv = KeyValue.createKeyValueFromKey(hf.getFirstKey());
@@ -528,7 +540,7 @@ public class HBaseFsck {
     // create new region on hdfs.  move data into place.
     HRegionInfo hri = new HRegionInfo(template.getName(), orphanRegionRange.getFirst(), orphanRegionRange.getSecond());
     LOG.info("Creating new region : " + hri);
-    HRegion region = HBaseFsckRepair.createHDFSRegionDir(conf, hri, template);
+    HRegion region = HBaseFsckRepair.createHDFSRegionDir(getConf(), hri, template);
     Path target = region.getRegionDir();
 
     // rename all the data to new region
@@ -585,6 +597,67 @@ public class HBaseFsck {
   }
 
   /**
+   * Scan all the store file names to find any lingering reference files,
+   * which refer to some none-exiting files. If "fix" option is enabled,
+   * any lingering reference file will be sidelined if found.
+   * <p>
+   * Lingering reference file prevents a region from opening. It has to
+   * be fixed before a cluster can start properly.
+   */
+  private void offlineReferenceFileRepair() throws IOException {
+    Configuration conf = getConf();
+    Path hbaseRoot = FSUtils.getRootDir(conf);
+    FileSystem fs = hbaseRoot.getFileSystem(conf);
+    Map<String, Path> allFiles = FSUtils.getTableStoreFilePathMap(fs, hbaseRoot);
+    for (Path path: allFiles.values()) {
+      boolean isReference = false;
+      try {
+        isReference = StoreFile.isReference(path);
+      } catch (Throwable t) {
+        // Ignore. Some files may not be store files at all.
+        // For example, files under .oldlogs folder in .META.
+        // Warning message is already logged by
+        // StoreFile#isReference.
+      }
+      if (!isReference) continue;
+
+      Path referredToFile = StoreFile.getReferredToFile(path);
+      if (fs.exists(referredToFile)) continue;  // good, expected
+
+      // Found a lingering reference file
+      errors.reportError(ERROR_CODE.LINGERING_REFERENCE_HFILE,
+        "Found lingering reference file " + path);
+      if (!shouldFixReferenceFiles()) continue;
+
+      // Now, trying to fix it since requested
+      boolean success = false;
+      String pathStr = path.toString();
+
+      // A reference file path should be like
+      // ${hbase.rootdir}/table_name/region_id/family_name/referred_file.region_name
+      // Up 3 directories to get the table folder.
+      // So the file will be sidelined to a similar folder structure.
+      int index = pathStr.lastIndexOf(Path.SEPARATOR_CHAR);
+      for (int i = 0; index > 0 && i < 3; i++) {
+        index = pathStr.lastIndexOf(Path.SEPARATOR_CHAR, index);
+      }
+      if (index > 0) {
+        Path rootDir = getSidelineDir();
+        Path dst = new Path(rootDir, pathStr.substring(index));
+        fs.mkdirs(dst.getParent());
+        LOG.info("Trying to sildeline reference file"
+          + path + " to " + dst);
+        setShouldRerun();
+
+        success = fs.rename(path, dst);
+      }
+      if (!success) {
+        LOG.error("Failed to sideline reference file " + path);
+      }
+    }
+  }
+
+  /**
    * TODO -- need to add tests for this.
    */
   private void reportEmptyMetaCells() {
@@ -640,7 +713,7 @@ public class HBaseFsck {
     }
 
     Path regioninfo = new Path(regionDir, HRegion.REGIONINFO_FILE);
-    FileSystem fs = regioninfo.getFileSystem(conf);
+    FileSystem fs = regioninfo.getFileSystem(getConf());
 
     FSDataInputStream in = fs.open(regioninfo);
     HRegionInfo hri = new HRegionInfo();
@@ -715,11 +788,11 @@ public class HBaseFsck {
       if (modTInfo == null) {
         // only executed once per table.
         modTInfo = new TableInfo(tableName);
-        Path hbaseRoot = FSUtils.getRootDir(conf);
+        Path hbaseRoot = FSUtils.getRootDir(getConf());
         tablesInfo.put(tableName, modTInfo);
         try {
           HTableDescriptor htd =
-              FSTableDescriptors.getTableDescriptor(hbaseRoot.getFileSystem(conf),
+              FSTableDescriptors.getTableDescriptor(hbaseRoot.getFileSystem(getConf()),
               hbaseRoot, tableName);
           modTInfo.htds.add(htd);
         } catch (IOException ioe) {
@@ -748,7 +821,7 @@ public class HBaseFsck {
    */
   private Set<String> getColumnFamilyList(Set<String> columns, HbckInfo hbi) throws IOException {
     Path regionDir = hbi.getHdfsRegionDir();
-    FileSystem fs = regionDir.getFileSystem(conf);
+    FileSystem fs = regionDir.getFileSystem(getConf());
     FileStatus[] subDirs = fs.listStatus(regionDir, new FSUtils.FamilyDirFilter(fs));
     for (FileStatus subdir : subDirs) {
       String columnfamily = subdir.getPath().getName();
@@ -771,7 +844,7 @@ public class HBaseFsck {
     for (String columnfamimly : columns) {
       htd.addFamily(new HColumnDescriptor(columnfamimly));
     }
-    FSTableDescriptors.createTableDescriptor(htd, conf, true);
+    FSTableDescriptors.createTableDescriptor(htd, getConf(), true);
     return true;
   }
   
@@ -787,12 +860,12 @@ public class HBaseFsck {
   public void fixOrphanTables() throws IOException {
     if (shouldFixTableOrphans() && !orphanTableDirs.isEmpty()) {
 
-      Path hbaseRoot = FSUtils.getRootDir(conf);
+      Path hbaseRoot = FSUtils.getRootDir(getConf());
       List<String> tmpList = new ArrayList<String>();
       tmpList.addAll(orphanTableDirs.keySet());
       HTableDescriptor[] htds = getHTableDescriptors(tmpList);
-      Iterator iter = orphanTableDirs.entrySet().iterator();
-      int j = 0; 
+      Iterator<Entry<String, Set<String>>> iter = orphanTableDirs.entrySet().iterator();
+      int j = 0;
       int numFailedCase = 0;
       while (iter.hasNext()) {
         Entry<String, Set<String>> entry = (Entry<String, Set<String>>) iter.next();
@@ -803,7 +876,7 @@ public class HBaseFsck {
             HTableDescriptor htd = htds[j];
             LOG.info("fixing orphan table: " + tableName + " from cache");
             FSTableDescriptors.createTableDescriptor(
-                hbaseRoot.getFileSystem(conf), hbaseRoot, htd, true);
+                hbaseRoot.getFileSystem(getConf()), hbaseRoot, htd, true);
             j++;
             iter.remove();
           }
@@ -842,8 +915,8 @@ public class HBaseFsck {
    * @return an open .META. HRegion
    */
   private HRegion createNewRootAndMeta() throws IOException {
-    Path rootdir = new Path(conf.get(HConstants.HBASE_DIR));
-    Configuration c = conf;
+    Path rootdir = new Path(getConf().get(HConstants.HBASE_DIR));
+    Configuration c = getConf();
     HRegionInfo rootHRI = new HRegionInfo(HRegionInfo.ROOT_REGIONINFO);
     MasterFileSystem.setInfoFamilyCachingForRoot(false);
     HRegionInfo metaHRI = new HRegionInfo(HRegionInfo.FIRST_META_REGIONINFO);
@@ -986,7 +1059,7 @@ public class HBaseFsck {
     for (TableInfo tInfo : tablesInfo.values()) {
       TableIntegrityErrorHandler handler;
       if (fixHoles || fixOverlaps) {
-        handler = tInfo.new HDFSIntegrityFixer(tInfo, errors, conf,
+        handler = tInfo.new HDFSIntegrityFixer(tInfo, errors, getConf(),
           fixHoles, fixOverlaps);
       } else {
         handler = tInfo.new IntegrityFixSuggester(tInfo, errors);
@@ -1001,7 +1074,7 @@ public class HBaseFsck {
 
   private Path getSidelineDir() throws IOException {
     if (sidelineDir == null) {
-      Path hbaseDir = FSUtils.getRootDir(conf);
+      Path hbaseDir = FSUtils.getRootDir(getConf());
       Path hbckDir = new Path(hbaseDir, HConstants.HBCK_SIDELINEDIR_NAME);
       sidelineDir = new Path(hbckDir, hbaseDir.getName() + "-"
           + startMillis);
@@ -1118,8 +1191,8 @@ public class HBaseFsck {
    */
   Path sidelineOldRootAndMeta() throws IOException {
     // put current -ROOT- and .META. aside.
-    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
-    FileSystem fs = hbaseDir.getFileSystem(conf);
+    Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
+    FileSystem fs = hbaseDir.getFileSystem(getConf());
     Path backupDir = getSidelineDir();
     fs.mkdirs(backupDir);
 
@@ -1151,7 +1224,7 @@ public class HBaseFsck {
    */
   private void loadDisabledTables()
   throws ZooKeeperConnectionException, IOException {
-    HConnectionManager.execute(new HConnectable<Void>(conf) {
+    HConnectionManager.execute(new HConnectable<Void>(getConf()) {
       @Override
       public Void connect(HConnection connection) throws IOException {
         ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
@@ -1179,8 +1252,8 @@ public class HBaseFsck {
    * regionInfoMap
    */
   public void loadHdfsRegionDirs() throws IOException, InterruptedException {
-    Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
-    FileSystem fs = rootDir.getFileSystem(conf);
+    Path rootDir = new Path(getConf().get(HConstants.HBASE_DIR));
+    FileSystem fs = rootDir.getFileSystem(getConf());
 
     // list all tables from HDFS
     List<FileStatus> tableDirs = Lists.newArrayList();
@@ -1208,8 +1281,8 @@ public class HBaseFsck {
         LOG.info("Trying to create a new " + HConstants.VERSION_FILE_NAME
             + " file.");
         setShouldRerun();
-        FSUtils.setVersion(fs, rootDir, conf.getInt(
-            HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000), conf.getInt(
+        FSUtils.setVersion(fs, rootDir, getConf().getInt(
+            HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000), getConf().getInt(
             HConstants.VERSION_FILE_WRITE_ATTEMPTS,
             HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS));
       }
@@ -1335,8 +1408,8 @@ public class HBaseFsck {
       return;
     }
 
-    Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
-    FileSystem fs = hbaseDir.getFileSystem(conf);
+    Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
+    FileSystem fs = hbaseDir.getFileSystem(getConf());
     UserGroupInformation ugi = User.getCurrent().getUGI();
     FileStatus[] files = fs.listStatus(hbaseDir);
     for (FileStatus file : files) {
@@ -1492,8 +1565,12 @@ public class HBaseFsck {
       errors.print(msg);
       undeployRegions(hbi);
       setShouldRerun();
-      HBaseFsckRepair.fixUnassigned(admin, hbi.getHdfsHRI());
-      HBaseFsckRepair.waitUntilAssigned(admin, hbi.getHdfsHRI());
+      HRegionInfo hri = hbi.getHdfsHRI();
+      if (hri == null) {
+        hri = hbi.metaEntry;
+      }
+      HBaseFsckRepair.fixUnassigned(admin, hri);
+      HBaseFsckRepair.waitUntilAssigned(admin, hri);
     }
   }
 
@@ -1505,7 +1582,8 @@ public class HBaseFsck {
     String descriptiveName = hbi.toString();
 
     boolean inMeta = hbi.metaEntry != null;
-    boolean inHdfs = hbi.getHdfsRegionDir()!= null;
+    // In case not checking HDFS, assume the region is on HDFS
+    boolean inHdfs = !shouldCheckHdfs() || hbi.getHdfsRegionDir() != null;
     boolean hasMetaAssignment = inMeta && hbi.metaEntry.regionServer != null;
     boolean isDeployed = !hbi.deployedOn.isEmpty();
     boolean isMultiplyDeployed = hbi.deployedOn.size() > 1;
@@ -1515,7 +1593,7 @@ public class HBaseFsck {
     boolean splitParent =
       (hbi.metaEntry == null)? false: hbi.metaEntry.isSplit() && hbi.metaEntry.isOffline();
     boolean shouldBeDeployed = inMeta && !isTableDisabled(hbi.metaEntry);
-    boolean recentlyModified = hbi.getHdfsRegionDir() != null &&
+    boolean recentlyModified = inHdfs &&
       hbi.getModTime() + timelag > System.currentTimeMillis();
 
     // ========== First the healthy cases =============
@@ -1558,7 +1636,7 @@ public class HBaseFsck {
         }
 
         LOG.info("Patching .META. with .regioninfo: " + hbi.getHdfsHRI());
-        HBaseFsckRepair.fixMetaHoleOnline(conf, hbi.getHdfsHRI());
+        HBaseFsckRepair.fixMetaHoleOnline(getConf(), hbi.getHdfsHRI());
 
         tryAssignmentRepair(hbi, "Trying to reassign region...");
       }
@@ -1574,7 +1652,7 @@ public class HBaseFsck {
         }
 
         LOG.info("Patching .META. with with .regioninfo: " + hbi.getHdfsHRI());
-        HBaseFsckRepair.fixMetaHoleOnline(conf, hbi.getHdfsHRI());
+        HBaseFsckRepair.fixMetaHoleOnline(getConf(), hbi.getHdfsHRI());
 
         tryAssignmentRepair(hbi, "Trying to fix unassigned region...");
       }
@@ -1736,7 +1814,7 @@ public class HBaseFsck {
     debugLsr(contained.getHdfsRegionDir());
 
     // rename the contained into the container.
-    FileSystem fs = targetRegionDir.getFileSystem(conf);
+    FileSystem fs = targetRegionDir.getFileSystem(getConf());
     FileStatus[] dirs = fs.listStatus(contained.getHdfsRegionDir());
 
     if (dirs == null) {
@@ -2351,7 +2429,7 @@ public class HBaseFsck {
     HTableDescriptor[] htd = new HTableDescriptor[0];
      try {
        LOG.info("getHTableDescriptors == tableNames => " + tableNames);
-       htd = new HBaseAdmin(conf).getTableDescriptors(tableNames);
+       htd = new HBaseAdmin(getConf()).getTableDescriptors(tableNames);
      } catch (IOException e) {
        LOG.debug("Exception getting table descriptors", e);
      }
@@ -2494,12 +2572,12 @@ public class HBaseFsck {
     };
 
     // Scan -ROOT- to pick up META regions
-    MetaScanner.metaScan(conf, visitor, null, null,
+    MetaScanner.metaScan(getConf(), visitor, null, null,
       Integer.MAX_VALUE, HConstants.ROOT_TABLE_NAME);
 
     if (!checkMetaOnly) {
       // Scan .META. to pick up user regions
-      MetaScanner.metaScan(conf, visitor);
+      MetaScanner.metaScan(getConf(), visitor);
     }
 
     errors.print("");
@@ -2753,6 +2831,12 @@ public class HBaseFsck {
     }
   }
 
+  private static ErrorReporter getErrorReporter(
+      final Configuration conf) throws ClassNotFoundException {
+    Class<? extends ErrorReporter> reporter = conf.getClass("hbasefsck.errorreporter", PrintingErrorReporter.class, ErrorReporter.class);
+    return (ErrorReporter)ReflectionUtils.newInstance(reporter, conf);
+  }
+
   public interface ErrorReporter {
     public static enum ERROR_CODE {
       UNKNOWN, NO_META_REGION, NULL_ROOT_REGION, NO_VERSION_FILE, NOT_IN_META_HDFS, NOT_IN_META,
@@ -2760,7 +2844,7 @@ public class HBaseFsck {
       MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, RS_CONNECT_FAILURE,
       FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, DUPE_STARTKEYS,
       HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION,
-      ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE
+      ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE
     }
     public void clear();
     public void report(String message);
@@ -2778,7 +2862,7 @@ public class HBaseFsck {
     public boolean tableHasErrors(TableInfo table);
   }
 
-  private static class PrintingErrorReporter implements ErrorReporter {
+  static class PrintingErrorReporter implements ErrorReporter {
     public int errorCount = 0;
     private int showProgress;
 
@@ -3128,6 +3212,14 @@ public class HBaseFsck {
     return fixMeta;
   }
 
+  public void setCheckHdfs(boolean checking) {
+    checkHdfs = checking;
+  }
+
+  boolean shouldCheckHdfs() {
+    return checkHdfs;
+  }
+
   public void setFixHdfsHoles(boolean shouldFix) {
     fixHdfsHoles = shouldFix;
   }
@@ -3184,6 +3276,14 @@ public class HBaseFsck {
     return fixSplitParents;
   }
 
+  public void setFixReferenceFiles(boolean shouldFix) {
+    fixReferenceFiles = shouldFix;
+  }
+
+  boolean shouldFixReferenceFiles() {
+    return fixReferenceFiles;
+  }
+
   public boolean shouldIgnorePreCheckPermission() {
     return ignorePreCheckPermission;
   }
@@ -3245,7 +3345,7 @@ public class HBaseFsck {
   }
 
   protected HFileCorruptionChecker createHFileCorruptionChecker(boolean sidelineCorruptHFiles) throws IOException {
-    return new HFileCorruptionChecker(conf, executor, sidelineCorruptHFiles);
+    return new HFileCorruptionChecker(getConf(), executor, sidelineCorruptHFiles);
   }
 
   public HFileCorruptionChecker getHFilecorruptionChecker() {
@@ -3283,6 +3383,8 @@ public class HBaseFsck {
     System.err.println("   -fix              Try to fix region assignments.  This is for backwards compatiblity");
     System.err.println("   -fixAssignments   Try to fix region assignments.  Replaces the old -fix");
     System.err.println("   -fixMeta          Try to fix meta problems.  This assumes HDFS region info is good.");
+    System.err.println("   -noHdfsChecking   Don't load/check region info from HDFS."
+        + " Assumes META region info is good. Won't check/fix any HDFS issue, e.g. hole, orphan, or overlap");
     System.err.println("   -fixHdfsHoles     Try to fix region holes in hdfs.");
     System.err.println("   -fixHdfsOrphans   Try to fix region dirs with no .regioninfo file in hdfs");
     System.err.println("   -fixTableOrphans  Try to fix table dirs with no .tableinfo file in hdfs (online mode only)");
@@ -3293,6 +3395,7 @@ public class HBaseFsck {
     System.err.println("   -maxOverlapsToSideline <n>  When fixing region overlaps, allow at most <n> regions to sideline per group. (n=" + DEFAULT_OVERLAPS_TO_SIDELINE +" by default)");
     System.err.println("   -fixSplitParents  Try to force offline split parents to be online.");
     System.err.println("   -ignorePreCheckPermission  ignore filesystem permission pre-check");
+    System.err.println("   -fixReferenceFiles  Try to offline lingering reference store files");
 
     System.err.println("");
     System.err.println("  Datafile Repair options: (expert features, use with caution!)");
@@ -3302,7 +3405,7 @@ public class HBaseFsck {
     System.err.println("");
     System.err.println("  Metadata Repair shortcuts");
     System.err.println("   -repair           Shortcut for -fixAssignments -fixMeta -fixHdfsHoles " +
-        "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps");
+        "-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles");
     System.err.println("   -repairHoles      Shortcut for -fixAssignments -fixMeta -fixHdfsHoles");
 
     setRetCode(-2);
@@ -3316,7 +3419,6 @@ public class HBaseFsck {
    * @throws Exception
    */
   public static void main(String[] args) throws Exception {
-
     // create a fsck object
     Configuration conf = HBaseConfiguration.create();
     Path hbasedir = new Path(conf.get(HConstants.HBASE_DIR));
@@ -3324,12 +3426,14 @@ public class HBaseFsck {
     conf.set("fs.defaultFS", defaultFs.toString());     // for hadoop 0.21+
     conf.set("fs.default.name", defaultFs.toString());  // for hadoop 0.20
 
-    int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
-    ExecutorService exec = new ScheduledThreadPoolExecutor(numThreads);
-    HBaseFsck hbck = new HBaseFsck(conf, exec);
-    hbck.exec(exec, args);
-    int retcode = hbck.getRetCode();
-    Runtime.getRuntime().exit(retcode);
+    int ret = ToolRunner.run(new HBaseFsck(conf), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    exec(executor, args);
+    return getRetCode();
   }
 
   public HBaseFsck exec(ExecutorService exec, String[] args) throws KeeperException, IOException,
@@ -3386,6 +3490,8 @@ public class HBaseFsck {
         setFixAssignments(true);
       } else if (cmd.equals("-fixMeta")) {
         setFixMeta(true);
+      } else if (cmd.equals("-noHdfsChecking")) {
+        setCheckHdfs(false);
       } else if (cmd.equals("-fixHdfsHoles")) {
         setFixHdfsHoles(true);
       } else if (cmd.equals("-fixHdfsOrphans")) {
@@ -3406,6 +3512,8 @@ public class HBaseFsck {
         checkCorruptHFiles = true;
       } else if (cmd.equals("-sidelineCorruptHFiles")) {
         sidelineCorruptHFiles = true;
+      } else if (cmd.equals("-fixReferenceFiles")) {
+        setFixReferenceFiles(true);
       } else if (cmd.equals("-repair")) {
         // this attempts to merge overlapping hdfs regions, needs testing
         // under load
@@ -3417,6 +3525,8 @@ public class HBaseFsck {
         setFixVersionFile(true);
         setSidelineBigOverlaps(true);
         setFixSplitParents(false);
+        setCheckHdfs(true);
+        setFixReferenceFiles(true);
       } else if (cmd.equals("-repairHoles")) {
         // this will make all missing hdfs regions available but may lose data
         setFixHdfsHoles(true);
@@ -3426,6 +3536,7 @@ public class HBaseFsck {
         setFixHdfsOverlaps(false);
         setSidelineBigOverlaps(false);
         setFixSplitParents(false);
+        setCheckHdfs(true);
       } else if (cmd.equals("-maxOverlapsToSideline")) {
         if (i == args.length - 1) {
           System.err.println("-maxOverlapsToSideline needs a numeric value argument.");
@@ -3484,13 +3595,13 @@ public class HBaseFsck {
       setHFileCorruptionChecker(hfcc); // so we can get result
       Collection<String> tables = getIncludedTables();
       Collection<Path> tableDirs = new ArrayList<Path>();
-      Path rootdir = FSUtils.getRootDir(conf);
+      Path rootdir = FSUtils.getRootDir(getConf());
       if (tables.size() > 0) {
         for (String t : tables) {
           tableDirs.add(FSUtils.getTablePath(rootdir, t));
         }
       } else {
-        tableDirs = FSUtils.getTableDirs(FSUtils.getCurrentFileSystem(conf), rootdir);
+        tableDirs = FSUtils.getTableDirs(FSUtils.getCurrentFileSystem(getConf()), rootdir);
       }
       hfcc.checkTables(tableDirs);
       PrintWriter out = new PrintWriter(System.out);
@@ -3530,14 +3641,14 @@ public class HBaseFsck {
    * ls -r for debugging purposes
    */
   void debugLsr(Path p) throws IOException {
-    debugLsr(conf, p);
+    debugLsr(getConf(), p);
   }
 
   /**
    * ls -r for debugging purposes
    */
   public static void debugLsr(Configuration conf, Path p) throws IOException {
-    if (!LOG.isDebugEnabled()) {
+    if (!LOG.isDebugEnabled() || p == null) {
       return;
     }
     FileSystem fs = p.getFileSystem(conf);

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/util/Threads.java Sun Dec 23 20:54:12 2012
@@ -126,7 +126,7 @@ public class Threads {
   /**
    * @param millis How long to sleep for in milliseconds.
    */
-  public static void sleep(int millis) {
+  public static void sleep(long millis) {
     try {
       Thread.sleep(millis);
     } catch (InterruptedException e) {
@@ -158,15 +158,15 @@ public class Threads {
   }
 
   /**
-   * Create a new CachedThreadPool with a bounded number as the maximum 
+   * Create a new CachedThreadPool with a bounded number as the maximum
    * thread size in the pool.
-   * 
+   *
    * @param maxCachedThread the maximum thread could be created in the pool
    * @param timeout the maximum time to wait
    * @param unit the time unit of the timeout argument
    * @param threadFactory the factory to use when creating new threads
-   * @return threadPoolExecutor the cachedThreadPool with a bounded number 
-   * as the maximum thread size in the pool. 
+   * @return threadPoolExecutor the cachedThreadPool with a bounded number
+   * as the maximum thread size in the pool.
    */
   public static ThreadPoolExecutor getBoundedCachedThreadPool(
       int maxCachedThread, long timeout, TimeUnit unit,
@@ -178,12 +178,13 @@ public class Threads {
     boundedCachedThreadPool.allowCoreThreadTimeOut(true);
     return boundedCachedThreadPool;
   }
-  
-  
+
+
   /**
-   * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
-   * with a common prefix.
-   * @param prefix The prefix of every created Thread's name
+   * Returns a {@link java.util.concurrent.ThreadFactory} that names each
+   * created thread uniquely, with a common prefix.
+   *
+   * @param prefix  The prefix of every created Thread's name
    * @return a {@link java.util.concurrent.ThreadFactory} that names threads
    */
   public static ThreadFactory getNamedThreadFactory(final String prefix) {
@@ -205,6 +206,7 @@ public class Threads {
     };
   }
 
+
   /**
    * Get a named {@link ThreadFactory} that just builds daemon threads
    * @param prefix name prefix for all threads created from the factory

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java Sun Dec 23 20:54:12 2012
@@ -61,6 +61,12 @@ public class HQuorumPeer {
       writeMyID(zkProperties);
       QuorumPeerConfig zkConfig = new QuorumPeerConfig();
       zkConfig.parseProperties(zkProperties);
+
+      // login the zookeeper server principal (if using security)
+      ZKUtil.loginServer(conf, "hbase.zookeeper.server.keytab.file",
+        "hbase.zookeeper.server.kerberos.principal",
+        zkConfig.getClientPortAddress().getHostName());
+
       runZKServer(zkConfig);
     } catch (Exception e) {
       e.printStackTrace();

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java Sun Dec 23 20:54:12 2012
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.zookeepe
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -32,11 +33,16 @@ import org.apache.hadoop.hbase.util.Retr
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
 
 /**
  * A zookeeper that can handle 'recoverable' errors.
@@ -490,6 +496,61 @@ public class RecoverableZooKeeper {
     }
   }
 
+  /**
+   * Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
+   * instances to actually pass to multi (need to do this in order to appendMetaData).
+   */
+  private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
+  throws UnsupportedOperationException {
+    if(ops == null) return null;
+
+    List<Op> preparedOps = new LinkedList<Op>();
+    for (Op op : ops) {
+      if (op.getType() == ZooDefs.OpCode.create) {
+        CreateRequest create = (CreateRequest)op.toRequestRecord();
+        preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
+          create.getAcl(), create.getFlags()));
+      } else if (op.getType() == ZooDefs.OpCode.delete) {
+        // no need to appendMetaData for delete
+        preparedOps.add(op);
+      } else if (op.getType() == ZooDefs.OpCode.setData) {
+        SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
+        preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
+          setData.getVersion()));
+      } else {
+        throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
+      }
+    }
+    return preparedOps;
+  }
+
+  /**
+   * Run multiple operations in a transactional manner. Retry before throwing exception
+   */
+  public List<OpResult> multi(Iterable<Op> ops)
+  throws KeeperException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    Iterable<Op> multiOps = prepareZKMulti(ops);
+    while (true) {
+      try {
+        return zk.multi(multiOps);
+      } catch (KeeperException e) {
+        switch (e.code()) {
+          case CONNECTIONLOSS:
+          case SESSIONEXPIRED:
+          case OPERATIONTIMEOUT:
+            retryOrThrow(retryCounter, e, "multi");
+            break;
+
+          default:
+            throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+      retryCounter.useRetry();
+    }
+  }
+
   private String findPreviousSequentialNode(String path)
     throws KeeperException, InterruptedException {
     int lastSlashIdx = path.lastIndexOf('/');

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java Sun Dec 23 20:54:12 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.zookeepe
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.zookeeper.KeeperException;
 
 /**
@@ -228,16 +230,19 @@ public class ZKTable {
       }
     }
     synchronized (this.cache) {
+      List<ZKUtilOp> ops = new LinkedList<ZKUtilOp>();
       if (settingToEnabled) {
-        ZKUtil.deleteNodeFailSilent(this.watcher, znode92);
+        ops.add(ZKUtilOp.deleteNodeFailSilent(znode92));
       }
       else {
-        ZKUtil.setData(this.watcher, znode92, Bytes.toBytes(state.toString()));
+        ops.add(ZKUtilOp.setData(znode92, Bytes.toBytes(state.toString())));
       }
-      // Set the current format znode after the 0.92 format znode.
+      // If not running multi-update either because of configuration or failure,
+      // set the current format znode after the 0.92 format znode.
       // This is so in the case of failure, the AssignmentManager is guaranteed to
       // see the state was not applied, since it uses the current format znode internally.
-      ZKUtil.setData(this.watcher, znode, Bytes.toBytes(state.toString()));
+      ops.add(ZKUtilOp.setData(znode, Bytes.toBytes(state.toString())));
+      ZKUtil.multiOrSequential(this.watcher, ops, true);
       this.cache.put(tableName, state);
     }
   }
@@ -292,13 +297,16 @@ public class ZKTable {
   public void setDeletedTable(final String tableName)
   throws KeeperException {
     synchronized (this.cache) {
-      ZKUtil.deleteNodeFailSilent(this.watcher,
-        ZKUtil.joinZNode(this.watcher.masterTableZNode92, tableName));
-      // Delete the current format znode after the 0.92 format znode.
-      // This is so in the case of failure, the AssignmentManager is guaranteed to
-      // see the table was not deleted, since it uses the current format znode internally.
-      ZKUtil.deleteNodeFailSilent(this.watcher,
-        ZKUtil.joinZNode(this.watcher.masterTableZNode, tableName));
+      List<ZKUtilOp> ops = new LinkedList<ZKUtilOp>();
+      ops.add(ZKUtilOp.deleteNodeFailSilent(
+        ZKUtil.joinZNode(this.watcher.masterTableZNode92, tableName)));
+      // If not running multi-update either because of configuration or failure,
+      // delete the current format znode after the 0.92 format znode.  This is so in the case of
+      // failure, the AssignmentManager is guaranteed to see the table was not deleted, since it
+      // uses the current format znode internally.
+      ops.add(ZKUtilOp.deleteNodeFailSilent(
+        ZKUtil.joinZNode(this.watcher.masterTableZNode, tableName)));
+      ZKUtil.multiOrSequential(this.watcher, ops, true);
       if (this.cache.remove(tableName) == null) {
         LOG.warn("Moving table " + tableName + " state to deleted but was " +
           "already deleted");

Modified: hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/branches/0.94-test/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Sun Dec 23 20:54:12 2012
@@ -24,10 +24,21 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
+import java.net.InetAddress;
 import java.net.Socket;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.login.LoginException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -39,15 +50,24 @@ import org.apache.hadoop.hbase.ServerNam
 import org.apache.hadoop.hbase.executor.RegionTransitionData;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.DeleteNodeFailSilent;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Op;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.server.ZooKeeperSaslServer;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
 
 /**
  * Internal HBase utility class for ZooKeeper.
@@ -108,6 +128,170 @@ public class ZKUtil {
         retry, retryIntervalMillis);
   }
 
+  /**
+   * Log in the current zookeeper server process using the given configuration
+   * keys for the credential file and login principal.
+   *
+   * <p><strong>This is only applicable when running on secure hbase</strong>
+   * On regular HBase (without security features), this will safely be ignored.
+   * </p>
+   *
+   * @param conf The configuration data to use
+   * @param keytabFileKey Property key used to configure the path to the credential file
+   * @param userNameKey Property key used to configure the login principal
+   * @param hostname Current hostname to use in any credentials
+   * @throws IOException underlying exception from SecurityUtil.login() call
+   */
+  public static void loginServer(Configuration conf, String keytabFileKey,
+      String userNameKey, String hostname) throws IOException {
+    login(conf, keytabFileKey, userNameKey, hostname,
+          ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
+          JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME);
+  }
+
+  /**
+   * Log in the current zookeeper client using the given configuration
+   * keys for the credential file and login principal.
+   *
+   * <p><strong>This is only applicable when running on secure hbase</strong>
+   * On regular HBase (without security features), this will safely be ignored.
+   * </p>
+   *
+   * @param conf The configuration data to use
+   * @param keytabFileKey Property key used to configure the path to the credential file
+   * @param userNameKey Property key used to configure the login principal
+   * @param hostname Current hostname to use in any credentials
+   * @throws IOException underlying exception from SecurityUtil.login() call
+   */
+  public static void loginClient(Configuration conf, String keytabFileKey,
+      String userNameKey, String hostname) throws IOException {
+    login(conf, keytabFileKey, userNameKey, hostname,
+          ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
+          JaasConfiguration.CLIENT_KEYTAB_KERBEROS_CONFIG_NAME);
+  }
+
+  /**
+   * Log in the current process using the given configuration keys for the
+   * credential file and login principal.
+   *
+   * <p><strong>This is only applicable when running on secure hbase</strong>
+   * On regular HBase (without security features), this will safely be ignored.
+   * </p>
+   *
+   * @param conf The configuration data to use
+   * @param keytabFileKey Property key used to configure the path to the credential file
+   * @param userNameKey Property key used to configure the login principal
+   * @param hostname Current hostname to use in any credentials
+   * @param loginContextProperty property name to expose the entry name
+   * @param loginContextName jaas entry name
+   * @throws IOException underlying exception from SecurityUtil.login() call
+   */
+  private static void login(Configuration conf, String keytabFileKey,
+      String userNameKey, String hostname,
+      String loginContextProperty, String loginContextName)
+      throws IOException {
+    if (!isSecureZooKeeper(conf))
+      return;
+
+    // User has specified a jaas.conf, keep this one as the good one.
+    // HBASE_OPTS="-Djava.security.auth.login.config=jaas.conf"
+    if (System.getProperty("java.security.auth.login.config") != null)
+      return;
+
+    String keytabFilename = conf.get(keytabFileKey);
+    String principalConfig = conf.get(userNameKey, System.getProperty("user.name"));
+    String principalName = SecurityUtil.getServerPrincipal(principalConfig, hostname);
+
+    // Initialize the "jaas.conf" for keyTab/principal,
+    // If keyTab is not specified use the Ticket Cache.
+    // and set the zookeeper login context name.
+    JaasConfiguration jaasConf = new JaasConfiguration(loginContextName,
+      keytabFilename, principalName);
+    javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+    System.setProperty(loginContextProperty, loginContextName);
+  }
+
+  /**
+   * A JAAS configuration that defines the login modules that we want to use for login.
+   */
+  private static class JaasConfiguration extends javax.security.auth.login.Configuration {
+    private static final String SERVER_KEYTAB_KERBEROS_CONFIG_NAME =
+      "zookeeper-server-keytab-kerberos";
+    private static final String CLIENT_KEYTAB_KERBEROS_CONFIG_NAME =
+      "zookeeper-client-keytab-kerberos";
+
+    private static final Map<String, String> BASIC_JAAS_OPTIONS =
+      new HashMap<String,String>();
+    static {
+      String jaasEnvVar = System.getenv("HBASE_JAAS_DEBUG");
+      if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
+        BASIC_JAAS_OPTIONS.put("debug", "true");
+      }
+    }
+
+    private static final Map<String,String> KEYTAB_KERBEROS_OPTIONS =
+      new HashMap<String,String>();
+    static {
+      KEYTAB_KERBEROS_OPTIONS.put("doNotPrompt", "true");
+      KEYTAB_KERBEROS_OPTIONS.put("storeKey", "true");
+      KEYTAB_KERBEROS_OPTIONS.put("refreshKrb5Config", "true");
+      KEYTAB_KERBEROS_OPTIONS.putAll(BASIC_JAAS_OPTIONS);
+    }
+
+    private static final AppConfigurationEntry KEYTAB_KERBEROS_LOGIN =
+      new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(),
+                                LoginModuleControlFlag.REQUIRED,
+                                KEYTAB_KERBEROS_OPTIONS);
+
+    private static final AppConfigurationEntry[] KEYTAB_KERBEROS_CONF =
+      new AppConfigurationEntry[]{KEYTAB_KERBEROS_LOGIN};
+
+    private javax.security.auth.login.Configuration baseConfig;
+    private final String loginContextName;
+    private final boolean useTicketCache;
+    private final String keytabFile;
+    private final String principal;
+
+    public JaasConfiguration(String loginContextName, String principal) {
+      this(loginContextName, principal, null, true);
+    }
+
+    public JaasConfiguration(String loginContextName, String principal, String keytabFile) {
+      this(loginContextName, principal, keytabFile, keytabFile == null || keytabFile.length() == 0);
+    }
+
+    private JaasConfiguration(String loginContextName, String principal,
+                             String keytabFile, boolean useTicketCache) {
+      try {
+        this.baseConfig = javax.security.auth.login.Configuration.getConfiguration();
+      } catch (SecurityException e) {
+        this.baseConfig = null;
+      }
+      this.loginContextName = loginContextName;
+      this.useTicketCache = useTicketCache;
+      this.keytabFile = keytabFile;
+      this.principal = principal;
+      LOG.info("JaasConfiguration loginContextName=" + loginContextName +
+               " principal=" + principal + " useTicketCache=" + useTicketCache +
+               " keytabFile=" + keytabFile);
+    }
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+      if (loginContextName.equals(appName)) {
+        if (!useTicketCache) {
+          KEYTAB_KERBEROS_OPTIONS.put("keyTab", keytabFile);
+          KEYTAB_KERBEROS_OPTIONS.put("useKeyTab", "true");
+        }
+        KEYTAB_KERBEROS_OPTIONS.put("principal", principal);
+        KEYTAB_KERBEROS_OPTIONS.put("useTicketCache", useTicketCache ? "true" : "false");
+        return KEYTAB_KERBEROS_CONF;
+      }
+      if (baseConfig != null) return baseConfig.getAppConfigurationEntry(appName);
+      return(null);
+    }
+  }
+
   //
   // Helper methods
   //
@@ -249,9 +433,6 @@ public class ZKUtil {
   /**
    * Check if the specified node exists.  Sets no watches.
    *
-   * Returns true if node exists, false if not.  Returns an exception if there
-   * is an unexpected zookeeper exception.
-   *
    * @param zkw zk reference
    * @param znode path of node to watch
    * @return version of the node if it exists, -1 if does not exist
@@ -700,19 +881,29 @@ public class ZKUtil {
    */
   public static void setData(ZooKeeperWatcher zkw, String znode, byte [] data)
   throws KeeperException, KeeperException.NoNodeException {
-    setData(zkw, znode, data, -1);
+    setData(zkw, (SetData)ZKUtilOp.setData(znode, data));
   }
 
+  private static void setData(ZooKeeperWatcher zkw, SetData setData)
+  throws KeeperException, KeeperException.NoNodeException {
+    SetDataRequest sd = (SetDataRequest)toZooKeeperOp(zkw, setData).toRequestRecord();
+    setData(zkw, sd.getPath(), sd.getData(), sd.getVersion());
+  }
+
+  /**
+   * Returns whether or not secure authentication is enabled
+   * (whether <code>hbase.security.authentication</code> is set to
+   * <code>kerberos</code>.
+   */
   public static boolean isSecureZooKeeper(Configuration conf) {
-    // TODO: We need a better check for security enabled ZooKeeper. Currently
-    // the secure ZooKeeper client is set up using a supplied JaaS
-    // configuration file. But if the system property for the JaaS
-    // configuration file is set, this may not be an exclusive indication
-    // that HBase should set ACLs on znodes. As an alternative, we could do
-    // this more like Hadoop and build a JaaS configuration programmatically
-    // based on a site conf setting. The scope of such a change will be
-    // addressed in HBASE-4791.
-    return (System.getProperty("java.security.auth.login.config") != null);
+    // hbase shell need to use:
+    //    -Djava.security.auth.login.config=user-jaas.conf
+    // since each user has a different jaas.conf
+    if (System.getProperty("java.security.auth.login.config") != null)
+      return true;
+
+    // Master & RSs uses hbase.zookeeper.client.*
+    return "kerberos".equalsIgnoreCase(conf.get("hbase.security.authentication"));
   }
 
   private static ArrayList<ACL> createACL(ZooKeeperWatcher zkw, String node) {
@@ -896,14 +1087,20 @@ public class ZKUtil {
    * @throws KeeperException if unexpected zookeeper exception
    */
   public static void createAndFailSilent(ZooKeeperWatcher zkw,
-      String znode)
+      String znode) throws KeeperException {
+    createAndFailSilent(zkw,
+      (CreateAndFailSilent)ZKUtilOp.createAndFailSilent(znode, new byte[0]));
+  }
+
+  private static void createAndFailSilent(ZooKeeperWatcher zkw, CreateAndFailSilent cafs)
   throws KeeperException {
+    CreateRequest create = (CreateRequest)toZooKeeperOp(zkw, cafs).toRequestRecord();
+    String znode = create.getPath();
     try {
       RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
       waitForZKConnectionIfAuthenticating(zkw);
       if (zk.exists(znode, false) == null) {
-        zk.create(znode, new byte[0], createACL(zkw,znode),
-            CreateMode.PERSISTENT);
+        zk.create(znode, create.getData(), create.getAcl(), CreateMode.fromFlag(create.getFlags()));
       }
     } catch(KeeperException.NodeExistsException nee) {
     } catch(KeeperException.NoAuthException nee){
@@ -989,8 +1186,15 @@ public class ZKUtil {
    */
   public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node)
   throws KeeperException {
+    deleteNodeFailSilent(zkw,
+      (DeleteNodeFailSilent)ZKUtilOp.deleteNodeFailSilent(node));
+  }
+
+  private static void deleteNodeFailSilent(ZooKeeperWatcher zkw,
+      DeleteNodeFailSilent dnfs) throws KeeperException {
+    DeleteRequest delete = (DeleteRequest)toZooKeeperOp(zkw, dnfs).toRequestRecord();
     try {
-      zkw.getRecoverableZooKeeper().delete(node, -1);
+      zkw.getRecoverableZooKeeper().delete(delete.getPath(), delete.getVersion());
     } catch(KeeperException.NoNodeException nne) {
     } catch(InterruptedException ie) {
       zkw.interruptedException(ie);
@@ -1038,6 +1242,209 @@ public class ZKUtil {
     }
   }
 
+  /**
+   * Represents an action taken by ZKUtil, e.g. createAndFailSilent.
+   * These actions are higher-level than {@link ZKOp} actions, which represent
+   * individual actions in the ZooKeeper API, like create.
+   */
+  public abstract static class ZKUtilOp {
+    private String path;
+
+    private ZKUtilOp(String path) {
+      this.path = path;
+    }
+
+    /**
+     * @return a createAndFailSilent ZKUtilOp
+     */
+    public static ZKUtilOp createAndFailSilent(String path, byte[] data) {
+      return new CreateAndFailSilent(path, data);
+    }
+
+    /**
+     * @return a deleteNodeFailSilent ZKUtilOP
+     */
+    public static ZKUtilOp deleteNodeFailSilent(String path) {
+      return new DeleteNodeFailSilent(path);
+    }
+
+    /**
+     * @return a setData ZKUtilOp
+     */
+    public static ZKUtilOp setData(String path, byte [] data) {
+      return new SetData(path, data);
+    }
+
+    /**
+     * @return path to znode where the ZKOp will occur
+     */
+    public String getPath() {
+      return path;
+    }
+
+    /**
+     * ZKUtilOp representing createAndFailSilent in ZooKeeper
+     * (attempt to create node, ignore error if already exists)
+     */
+    public static class CreateAndFailSilent extends ZKUtilOp {
+      private byte [] data;
+
+      private CreateAndFailSilent(String path, byte [] data) {
+        super(path);
+        this.data = data;
+      }
+
+      public byte[] getData() {
+        return data;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof CreateAndFailSilent)) return false;
+
+        CreateAndFailSilent op = (CreateAndFailSilent) o;
+        return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
+      }
+    }
+
+    /**
+     * ZKUtilOp representing deleteNodeFailSilent in ZooKeeper
+     * (attempt to delete node, ignore error if node doesn't exist)
+     */
+    public static class DeleteNodeFailSilent extends ZKUtilOp {
+      private DeleteNodeFailSilent(String path) {
+        super(path);
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof DeleteNodeFailSilent)) return false;
+
+        return super.equals(o);
+      }
+    }
+
+    /**
+     * @return ZKUtilOp representing setData in ZooKeeper
+     */
+    public static class SetData extends ZKUtilOp {
+      private byte [] data;
+
+      private SetData(String path, byte [] data) {
+        super(path);
+        this.data = data;
+      }
+
+      public byte[] getData() {
+        return data;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof SetData)) return false;
+
+        SetData op = (SetData) o;
+        return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
+      }
+    }
+  }
+
+  /**
+   * Convert from ZKUtilOp to ZKOp
+   */
+  private static Op toZooKeeperOp(ZooKeeperWatcher zkw, ZKUtilOp op)
+  throws UnsupportedOperationException {
+    if(op == null) return null;
+
+    if (op instanceof CreateAndFailSilent) {
+      CreateAndFailSilent cafs = (CreateAndFailSilent)op;
+      return Op.create(cafs.getPath(), cafs.getData(), createACL(zkw, cafs.getPath()),
+        CreateMode.PERSISTENT);
+    } else if (op instanceof DeleteNodeFailSilent) {
+      DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
+      return Op.delete(dnfs.getPath(), -1);
+    } else if (op instanceof SetData) {
+      SetData sd = (SetData)op;
+      return Op.setData(sd.getPath(), sd.getData(), -1);
+    } else {
+      throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
+        + op.getClass().getName());
+    }
+  }
+
+  /**
+   * If hbase.zookeeper.useMulti is true, use ZooKeeper's multi-update functionality.
+   * Otherwise, run the list of operations sequentially.
+   *
+   * If all of the following are true:
+   * - runSequentialOnMultiFailure is true
+   * - hbase.zookeeper.useMulti is true
+   * - on calling multi, we get a ZooKeeper exception that can be handled by a sequential call(*)
+   * Then:
+   * - we retry the operations one-by-one (sequentially)
+   *
+   * Note *: an example is receiving a NodeExistsException from a "create" call.  Without multi,
+   * a user could call "createAndFailSilent" to ensure that a node exists if they don't care who
+   * actually created the node (i.e. the NodeExistsException from ZooKeeper is caught).
+   * This will cause all operations in the multi to fail, however, because
+   * the NodeExistsException that zk.create throws will fail the multi transaction.
+   * In this case, if the previous conditions hold, the commands are run sequentially, which should
+   * result in the correct final state, but means that the operations will not run atomically.
+   *
+   * @throws KeeperException
+   */
+  public static void multiOrSequential(ZooKeeperWatcher zkw, List<ZKUtilOp> ops,
+      boolean runSequentialOnMultiFailure) throws KeeperException {
+    if (ops == null) return;
+    boolean useMulti = zkw.getConfiguration().getBoolean(HConstants.ZOOKEEPER_USEMULTI, false);
+
+    if (useMulti) {
+      List<Op> zkOps = new LinkedList<Op>();
+      for (ZKUtilOp op : ops) {
+        zkOps.add(toZooKeeperOp(zkw, op));
+      }
+      try {
+        zkw.getRecoverableZooKeeper().multi(zkOps);
+      } catch (KeeperException ke) {
+       switch (ke.code()) {
+         case NODEEXISTS:
+         case NONODE:
+         case BADVERSION:
+         case NOAUTH:
+           // if we get an exception that could be solved by running sequentially
+           // (and the client asked us to), then break out and run sequentially
+           if (runSequentialOnMultiFailure) {
+             LOG.info("On call to ZK.multi, received exception: " + ke.toString() + "."
+               + "  Attempting to run operations sequentially because"
+               + " runSequentialOnMultiFailure is: " + runSequentialOnMultiFailure + ".");
+             break;
+           }
+          default:
+            throw ke;
+        }
+      } catch (InterruptedException ie) {
+        zkw.interruptedException(ie);
+      }
+    }
+
+    // run sequentially
+    for (ZKUtilOp op : ops) {
+      if (op instanceof CreateAndFailSilent) {
+        createAndFailSilent(zkw, (CreateAndFailSilent)op);
+      } else if (op instanceof DeleteNodeFailSilent) {
+        deleteNodeFailSilent(zkw, (DeleteNodeFailSilent)op);
+      } else if (op instanceof SetData) {
+        setData(zkw, (SetData)op);
+      } else {
+        throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
+          + op.getClass().getName());
+      }
+    }
+  }
+
   //
   // ZooKeeper cluster information
   //

Modified: hbase/branches/0.94-test/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.94-test/src/main/resources/hbase-default.xml?rev=1425525&r1=1425524&r2=1425525&view=diff
==============================================================================
--- hbase/branches/0.94-test/src/main/resources/hbase-default.xml (original)
+++ hbase/branches/0.94-test/src/main/resources/hbase-default.xml Sun Dec 23 20:54:12 2012
@@ -690,6 +690,17 @@
     for more information.
     </description>
   </property>
+  <property>
+    <name>hbase.zookeeper.useMulti</name>
+    <value>false</value>
+    <description>Instructs HBase to make use of ZooKeeper's multi-update functionality.
+    This allows certain ZooKeeper operations to complete more quickly and prevents some issues
+    with rare ZooKeeper failure scenarios (see the release note of HBASE-6710 for an example).
+    IMPORTANT: only set this to true if all ZooKeeper servers in the cluster are on version 3.4+
+    and will not be downgraded.  ZooKeeper versions before 3.4 do not support multi-update and will
+    not fail gracefully if multi-update is invoked (see ZOOKEEPER-1495).
+    </description>
+  </property>
   <!-- End of properties used to generate ZooKeeper host:port quorum list. -->
 
   <!--



Mime
View raw message