hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject svn commit: r1445918 [9/29] - in /hbase/branches/hbase-7290: ./ bin/ conf/ dev-support/ hbase-client/ hbase-common/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ hbase-common/src/mai...
Date Wed, 13 Feb 2013 20:58:32 GMT
Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java Wed Feb 13 20:58:23 2013
@@ -19,14 +19,11 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 
 @InterfaceAudience.Private
 public class MetricsHBaseServer {
-  private static Log LOG = LogFactory.getLog(MetricsHBaseServer.class);
   private MetricsHBaseServerSource source;
 
   public MetricsHBaseServer(String serverName, MetricsHBaseServerWrapper wrapper) {
@@ -69,4 +66,4 @@ public class MetricsHBaseServer {
   public MetricsHBaseServerSource getMetricsSource() {
     return source;
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java Wed Feb 13 20:58:23 2013
@@ -21,6 +21,7 @@
 package org.apache.hadoop.hbase.ipc;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.IpcProtocol;
 import org.apache.hadoop.hbase.security.User;
 
 import java.net.InetAddress;
@@ -90,7 +91,7 @@ public class RequestContext {
    */
   public static void set(User user,
       InetAddress remoteAddress,
-      Class<? extends VersionedProtocol> protocol) {
+      Class<? extends IpcProtocol> protocol) {
     RequestContext ctx = instance.get();
     ctx.user = user;
     ctx.remoteAddress = remoteAddress;
@@ -111,12 +112,12 @@ public class RequestContext {
 
   private User user;
   private InetAddress remoteAddress;
-  private Class<? extends VersionedProtocol> protocol;
+  private Class<? extends IpcProtocol> protocol;
   // indicates we're within a RPC request invocation
   private boolean inRequest;
 
   private RequestContext(User user, InetAddress remoteAddr,
-      Class<? extends VersionedProtocol> protocol) {
+      Class<? extends IpcProtocol> protocol) {
     this.user = user;
     this.remoteAddress = remoteAddr;
     this.protocol = protocol;
@@ -130,11 +131,11 @@ public class RequestContext {
     return remoteAddress;
   }
 
-  public Class<? extends VersionedProtocol> getProtocol() {
+  public Class<? extends IpcProtocol> getProtocol() {
     return protocol;
   }
 
   public boolean isInRequest() {
     return inRequest;
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Wed Feb 13 20:58:23 2013
@@ -23,16 +23,18 @@ import com.google.common.base.Function;
 import com.google.protobuf.Message;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.IpcProtocol;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-/**
- */
 @InterfaceAudience.Private
 public interface RpcServer {
+  // TODO: Needs cleanup.  Why a 'start', and then a 'startThreads' and an 'openServer'?
+  // Also, the call takes a RpcRequestBody, an already composed combination of
+  // rpc Request and metadata.  Should disentangle metadata and rpc Request Message.
 
   void setSocketSendBufSize(int size);
 
@@ -45,12 +47,12 @@ public interface RpcServer {
   InetSocketAddress getListenerAddress();
 
   /** Called for each call.
-   * @param param writable parameter
+   * @param param parameter
    * @param receiveTime time
-   * @return Message
+   * @return Message Protobuf response Message
    * @throws java.io.IOException e
    */
-  Message call(Class<? extends VersionedProtocol> protocol,
+  Message call(Class<? extends IpcProtocol> protocol,
       RpcRequestBody param, long receiveTime, MonitoredRPCHandler status)
       throws IOException;
 
@@ -62,9 +64,8 @@ public interface RpcServer {
 
   void startThreads();
 
-
   /**
    * Returns the metrics instance for reporting RPC call statistics
    */
   MetricsHBaseServer getMetrics();
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcController.java Wed Feb 13 20:58:23 2013
@@ -121,4 +121,13 @@ public class ServerRpcController impleme
   public boolean failedOnException() {
     return serviceException != null;
   }
+
+  /**
+   * Throws an IOException back out if one is currently stored.
+   */
+  public void checkFailed() throws IOException {
+    if (failedOnException()) {
+      throw getFailedOn();
+    }
+  }
 }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java Wed Feb 13 20:58:23 2013
@@ -106,7 +106,20 @@ public class TableSplit implements Input
       Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow);
   }
 
+  @Override
   public int compareTo(TableSplit o) {
     return Bytes.compareTo(getStartRow(), o.getStartRow());
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || !(o instanceof TableSplit)) {
+      return false;
+    }
+    TableSplit other = (TableSplit)o;
+    return Bytes.equals(m_tableName, other.m_tableName) &&
+      Bytes.equals(m_startRow, other.m_startRow) &&
+      Bytes.equals(m_endRow, other.m_endRow) &&
+      m_regionLocation.equals(other.m_regionLocation);
+  }
 }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java Wed Feb 13 20:58:23 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mapreduc
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -27,19 +29,18 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.RowFilter;
-import org.apache.hadoop.hbase.filter.RegexStringComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
 * Export an HBase table.
@@ -137,10 +138,10 @@ public class Export {
 
     int batching = conf.getInt(EXPORT_BATCHING, -1);
     if (batching !=  -1){
-      try{
+      try {
         s.setBatch(batching);
-	} catch (RuntimeException e) {
-	    LOG.error("Batching could not be set", e);
+      } catch (IncompatibleFilterException e) {
+        LOG.error("Batching could not be set", e);
       }
     }
     LOG.info("versions=" + versions + ", starttime=" + startTime +

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Wed Feb 13 20:58:23 2013
@@ -49,17 +49,18 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -82,7 +83,7 @@ import org.apache.hadoop.mapreduce.lib.p
 public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
   static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
   static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
-  TimeRangeTracker trt = new TimeRangeTracker();
+  private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
   private static final String DATABLOCK_ENCODING_CONF_KEY =
      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
 
@@ -106,6 +107,7 @@ public class HFileOutputFormat extends F
 
     // create a map from column family to the compression algorithm
     final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
+    final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
 
     String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
     final HFileDataBlockEncoder encoder;
@@ -166,7 +168,6 @@ public class HFileOutputFormat extends F
 
         // we now have the proper HLog writer. full steam ahead
         kv.updateLatestStamp(this.now);
-        trt.includeTimestamp(kv);
         wl.writer.append(kv);
         wl.written += length;
 
@@ -187,9 +188,9 @@ public class HFileOutputFormat extends F
         this.rollRequested = false;
       }
 
-      /* Create a new HFile.Writer.
+      /* Create a new StoreFile.Writer.
        * @param family
-       * @return A WriterLength, containing a new HFile.Writer.
+       * @return A WriterLength, containing a new StoreFile.Writer.
        * @throws IOException
        */
       private WriterLength getNewWriter(byte[] family, Configuration conf)
@@ -198,20 +199,28 @@ public class HFileOutputFormat extends F
         Path familydir = new Path(outputdir, Bytes.toString(family));
         String compression = compressionMap.get(family);
         compression = compression == null ? defaultCompression : compression;
-        wl.writer = HFile.getWriterFactoryNoCache(conf)
-            .withPath(fs, StoreFile.getUniqueFile(fs, familydir))
-            .withBlockSize(blocksize)
-            .withCompression(compression)
-            .withComparator(KeyValue.KEY_COMPARATOR)
+        String bloomTypeStr = bloomTypeMap.get(family);
+        BloomType bloomType = BloomType.NONE;
+        if (bloomTypeStr != null) {
+          bloomType = BloomType.valueOf(bloomTypeStr);
+        }
+        Configuration tempConf = new Configuration(conf);
+        tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+        wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize)
+            .withOutputDir(familydir)
+            .withCompression(AbstractHFileWriter.compressionByName(compression))
+            .withBloomType(bloomType)
+            .withComparator(KeyValue.COMPARATOR)
             .withDataBlockEncoder(encoder)
             .withChecksumType(HStore.getChecksumType(conf))
             .withBytesPerChecksum(HStore.getBytesPerChecksum(conf))
-            .create();
+            .build();
+        
         this.writers.put(family, wl);
         return wl;
       }
 
-      private void close(final HFile.Writer w) throws IOException {
+      private void close(final StoreFile.Writer w) throws IOException {
         if (w != null) {
           w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
               Bytes.toBytes(System.currentTimeMillis()));
@@ -221,8 +230,7 @@ public class HFileOutputFormat extends F
               Bytes.toBytes(true));
           w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
               Bytes.toBytes(compactionExclude));
-          w.appendFileInfo(StoreFile.TIMERANGE_KEY,
-              WritableUtils.toByteArray(trt));
+          w.appendTrackedTimestampsToMetadata();
           w.close();
         }
       }
@@ -241,7 +249,7 @@ public class HFileOutputFormat extends F
    */
   static class WriterLength {
     long written = 0;
-    HFile.Writer writer = null;
+    StoreFile.Writer writer = null;
   }
 
   /**
@@ -359,7 +367,8 @@ public class HFileOutputFormat extends F
 
     // Set compression algorithms based on column families
     configureCompression(table, conf);
-
+    configureBloomType(table, conf);
+    
     TableMapReduceUtil.addDependencyJars(job);
     LOG.info("Incremental table output configured.");
   }
@@ -375,25 +384,39 @@ public class HFileOutputFormat extends F
    *         algorithm
    */
   static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
-    Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
-    String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
-    for (String familyConf : compressionConf.split("&")) {
+    return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY);
+  }
+
+  private static Map<byte[], String> createFamilyBloomMap(Configuration conf) {
+    return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY);
+  }
+  
+  /**
+   * Run inside the task to deserialize column family to given conf value map.
+   * 
+   * @param conf
+   * @param confName
+   * @return a map of column family to the given configuration value
+   */
+  private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
+    Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
+    String confVal = conf.get(confName, "");
+    for (String familyConf : confVal.split("&")) {
       String[] familySplit = familyConf.split("=");
       if (familySplit.length != 2) {
         continue;
       }
-
       try {
-        compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+        confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
             URLDecoder.decode(familySplit[1], "UTF-8"));
       } catch (UnsupportedEncodingException e) {
         // will not happen with UTF-8 encoding
         throw new AssertionError(e);
       }
     }
-    return compressionMap;
+    return confValMap;
   }
-
+  
   /**
    * Serialize column family to compression algorithm map to configuration.
    * Invoked while configuring the MR job for incremental load.
@@ -403,6 +426,8 @@ public class HFileOutputFormat extends F
    * @throws IOException
    *           on failure to read column family descriptors
    */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
   static void configureCompression(HTable table, Configuration conf) throws IOException {
     StringBuilder compressionConfigValue = new StringBuilder();
     HTableDescriptor tableDescriptor = table.getTableDescriptor();
@@ -423,4 +448,35 @@ public class HFileOutputFormat extends F
     // Get rid of the last ampersand
     conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
   }
+  
+  /**
+   * Serialize column family to bloom type map to configuration.
+   * Invoked while configuring the MR job for incremental load.
+   *
+   * @throws IOException
+   *           on failure to read column family descriptors
+   */
+  static void configureBloomType(HTable table, Configuration conf) throws IOException {
+    HTableDescriptor tableDescriptor = table.getTableDescriptor();
+    if (tableDescriptor == null) {
+      // could happen with mock table instance
+      return;
+    }
+    StringBuilder bloomTypeConfigValue = new StringBuilder();
+    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+    int i = 0;
+    for (HColumnDescriptor familyDescriptor : families) {
+      if (i++ > 0) {
+        bloomTypeConfigValue.append('&');
+      }
+      bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+      bloomTypeConfigValue.append('=');
+      String bloomType = familyDescriptor.getBloomFilterType().toString();
+      if (bloomType == null) {
+        bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
+      }
+      bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+    }
+    conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString());
+  }
 }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HLogInputFormat.java Wed Feb 13 20:58:23 2013
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -49,10 +48,10 @@ import org.apache.hadoop.mapreduce.TaskA
  */
 @InterfaceAudience.Public
 public class HLogInputFormat extends InputFormat<HLogKey, WALEdit> {
-  private static Log LOG = LogFactory.getLog(HLogInputFormat.class);
+  private static final Log LOG = LogFactory.getLog(HLogInputFormat.class);
 
-  public static String START_TIME_KEY = "hlog.start.time";
-  public static String END_TIME_KEY = "hlog.end.time";
+  public static final String START_TIME_KEY = "hlog.start.time";
+  public static final String END_TIME_KEY = "hlog.end.time";
 
   /**
    * {@link InputSplit} for {@link HLog} files. Each split represent

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Feb 13 20:58:23 2013
@@ -19,27 +19,41 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.UUID;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Import data written by {@link Export}.
@@ -47,9 +61,15 @@ import org.apache.hadoop.util.GenericOpt
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class Import {
+  private static final Log LOG = LogFactory.getLog(Import.class);
   final static String NAME = "import";
   final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
   final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+  final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
+  final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
+
+  // Optional filter to use for mappers
+  private static Filter filter;
 
   /**
    * A mapper that just writes out KeyValues.
@@ -72,6 +92,10 @@ public class Import {
     throws IOException {
       try {
         for (KeyValue kv : value.raw()) {
+          kv = filterKv(kv);
+          // skip if we filtered it out
+          if (kv == null) continue;
+
           context.write(row, convertKv(kv, cfRenameMap));
         }
       } catch (InterruptedException e) {
@@ -82,6 +106,7 @@ public class Import {
     @Override
     public void setup(Context context) {
       cfRenameMap = createCfRenameMap(context.getConfiguration());
+      filter = instantiateFilter(context.getConfiguration());
     }
   }
 
@@ -91,6 +116,7 @@ public class Import {
   static class Importer
   extends TableMapper<ImmutableBytesWritable, Mutation> {
     private Map<byte[], byte[]> cfRenameMap;
+    private UUID clusterId;
       
     /**
      * @param row  The current table row key.
@@ -116,6 +142,10 @@ public class Import {
       Put put = null;
       Delete delete = null;
       for (KeyValue kv : result.raw()) {
+        kv = filterKv(kv);
+        // skip if we filter it out
+        if (kv == null) continue;
+
         kv = convertKv(kv, cfRenameMap);
         // Deletes and Puts are gathered and written when finished
         if (kv.isDelete()) {
@@ -131,17 +161,106 @@ public class Import {
         }
       }
       if (put != null) {
+        put.setClusterId(clusterId);
         context.write(key, put);
       }
       if (delete != null) {
+        delete.setClusterId(clusterId);
         context.write(key, delete);
       }
     }
 
     @Override
     public void setup(Context context) {
-      cfRenameMap = createCfRenameMap(context.getConfiguration());
+      Configuration conf = context.getConfiguration();
+      cfRenameMap = createCfRenameMap(conf);
+      filter = instantiateFilter(conf);
+
+      try {
+        HConnection connection = HConnectionManager.getConnection(conf);
+        ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+        ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw);
+        clusterId = zkHelper.getUUIDForCluster(zkw);
+      } catch (ZooKeeperConnectionException e) {
+        LOG.error("Problem connecting to ZooKeper during task setup", e);
+      } catch (KeeperException e) {
+        LOG.error("Problem reading ZooKeeper data during task setup", e);
+      } catch (IOException e) {
+        LOG.error("Problem setting up task", e);
+      }
+
+    }
+  }
+
+  /**
+   * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
+   * optionally not include in the job output
+   * @param conf {@link Configuration} from which to load the filter
+   * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
+   * @throws IllegalArgumentException if the filter is misconfigured
+   */
+  private static Filter instantiateFilter(Configuration conf) {
+    // get the filter, if it was configured
+    Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+    if (filterClass == null) {
+      LOG.debug("No configured filter class, accepting all keyvalues.");
+      return null;
+    }
+    LOG.debug("Attempting to create filter:" + filterClass);
+
+    try {
+      Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
+      return (Filter) m.invoke(null, getFilterArgs(conf));
+    } catch (IllegalAccessException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (SecurityException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (NoSuchMethodException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (IllegalArgumentException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
+    ArrayList<byte[]> args = new ArrayList<byte[]>();
+    String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
+    for (String arg : sargs) {
+      // all the filters' instantiation methods expected quoted args since they are coming from
+      // the shell, so add them here, though its shouldn't really be needed :-/
+      args.add(Bytes.toBytes("'" + arg + "'"));
+    }
+    return args;
+  }
+
+  /**
+   * Attempt to filter out the keyvalue
+   * @param kv {@link KeyValue} on which to apply the filter
+   * @return <tt>null</tt> if the key should not be written, otherwise returns the original
+   *         {@link KeyValue}
+   */
+  private static KeyValue filterKv(KeyValue kv) {
+    // apply the filter and skip this kv if the filter doesn't apply
+    if (filter != null) {
+      Filter.ReturnCode code = filter.filterKeyValue(kv);
+      System.out.println("Filter returned:" + code);
+      // if its not an accept type, then skip this kv
+      if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
+          .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
+        if (LOG.isDebugEnabled()) {
+          System.out.println("Skipping key: " + kv + " from filter decision: " + code);
+        }
+        return null;
+      }
     }
+    return kv;
   }
 
   // helper: create a new KeyValue based on CF rename map
@@ -223,13 +342,33 @@ public class Import {
     }
     conf.set(CF_RENAME_PROP, sb.toString());
   }
-  
+
+  /**
+   * Add a Filter to be instantiated on import
+   * @param conf Configuration to update (will be passed to the job)
+   * @param clazz {@link Filter} subclass to instantiate on the server.
+   * @param args List of arguments to pass to the filter on instantiation
+   */
+  public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
+      List<String> args) {
+    conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
+
+    // build the param string for the key
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < args.size(); i++) {
+      String arg = args.get(i);
+      builder.append(arg);
+      if (i != args.size() - 1) {
+        builder.append(",");
+      }
+    }
+    conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
+  }
 
   /**
    * Sets up the actual job.
-   *
-   * @param conf  The current configuration.
-   * @param args  The command line parameters.
+   * @param conf The current configuration.
+   * @param args The command line parameters.
    * @return The newly created job.
    * @throws IOException When setting up the job fails.
    */
@@ -242,6 +381,17 @@ public class Import {
     FileInputFormat.setInputPaths(job, inputDir);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+
+    // make sure we get the filter in the jars
+    try {
+      Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+      if (filter != null) {
+        TableMapReduceUtil.addDependencyJars(conf, filter);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
     if (hfileOutPath != null) {
       job.setMapperClass(KeyValueImporter.class);
       HTable table = new HTable(conf, tableName);
@@ -274,6 +424,15 @@ public class Import {
     System.err.println("By default Import will load data directly into HBase. To instead generate");
     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err
+        .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
+    System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
+    System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
+    System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+        + CF_RENAME_PROP + " property. Futher, filters will only use the"
+        + "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+        + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
+        + "the KeyValue.");
     System.err.println("For performance consider the following options:\n"
         + "  -Dmapred.map.tasks.speculative.execution=false\n"
         + "  -Dmapred.reduce.tasks.speculative.execution=false");

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Wed Feb 13 20:58:23 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.client.HB
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -71,11 +72,13 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -91,19 +94,30 @@ import com.google.common.util.concurrent
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class LoadIncrementalHFiles extends Configured implements Tool {
-  private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
-  static AtomicLong regionCount = new AtomicLong(0);
+  private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+  static final AtomicLong regionCount = new AtomicLong(0);
   private HBaseAdmin hbAdmin;
   private Configuration cfg;
 
-  public static String NAME = "completebulkload";
-  private static String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
+  public static final String NAME = "completebulkload";
+  private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
   private boolean assignSeqIds;
 
-  public LoadIncrementalHFiles(Configuration conf) throws Exception {
+  private boolean useSecure;
+  private Token<?> userToken;
+  private String bulkToken;
+
+  //package private for testing
+  LoadIncrementalHFiles(Configuration conf, Boolean useSecure) throws Exception {
     super(conf);
     this.cfg = conf;
     this.hbAdmin = new HBaseAdmin(conf);
+    //added simple for testing
+    this.useSecure = useSecure != null ? useSecure : User.isHBaseSecurityEnabled(conf);
+  }
+
+  public LoadIncrementalHFiles(Configuration conf) throws Exception {
+    this(conf, null);
     assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
   }
 
@@ -215,6 +229,18 @@ public class LoadIncrementalHFiles exten
         return;
       }
 
+      //If using secure bulk load
+      //prepare staging directory and token
+      if(useSecure) {
+        FileSystem fs = FileSystem.get(cfg);
+        //This condition is here for unit testing
+        //Since delegation token doesn't work in mini cluster
+        if(User.isSecurityEnabled()) {
+         userToken = fs.getDelegationToken("renewer");
+        }
+        bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getTableName());
+      }
+
       // Assumes that region splits can happen while this occurs.
       while (!queue.isEmpty()) {
         // need to reload split keys each iteration.
@@ -243,6 +269,18 @@ public class LoadIncrementalHFiles exten
       }
 
     } finally {
+      if(useSecure) {
+        if(userToken != null) {
+          try {
+            userToken.cancel(cfg);
+          } catch (Exception e) {
+            LOG.warn("Failed to cancel HDFS delegation token.", e);
+          }
+        }
+        if(bulkToken != null) {
+          new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
+        }
+      }
       pool.shutdown();
       if (queue != null && !queue.isEmpty()) {
         StringBuilder err = new StringBuilder();
@@ -476,11 +514,47 @@ public class LoadIncrementalHFiles exten
         tableName, first) {
       @Override
       public Boolean call() throws Exception {
-        LOG.debug("Going to connect to server " + location + " for row "
-            + Bytes.toStringBinary(row));
-        byte[] regionName = location.getRegionInfo().getRegionName();
-        return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName,
-            assignSeqIds);
+        SecureBulkLoadClient secureClient = null;
+        boolean success = false;
+
+        try {
+          LOG.debug("Going to connect to server " + location + " for row "
+              + Bytes.toStringBinary(row));
+          byte[] regionName = location.getRegionInfo().getRegionName();
+          if(!useSecure) {
+            success = ProtobufUtil.bulkLoadHFile(server, famPaths, regionName, assignSeqIds);
+          } else {
+            HTable table = new HTable(conn.getConfiguration(), tableName);
+            secureClient = new SecureBulkLoadClient(table);
+            success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, location.getRegionInfo().getStartKey());
+          }
+          return success;
+        } finally {
+          //Best effort copying of files that might not have been imported
+          //from the staging directory back to original location
+          //in user directory
+          if(secureClient != null && !success) {
+            FileSystem fs = FileSystem.get(cfg);
+            for(Pair<byte[], String> el : famPaths) {
+              Path hfileStagingPath = null;
+              Path hfileOrigPath = new Path(el.getSecond());
+              try {
+                hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
+                    hfileOrigPath.getName());
+                if(fs.rename(hfileStagingPath, hfileOrigPath)) {
+                  LOG.debug("Moved back file " + hfileOrigPath + " from " +
+                      hfileStagingPath);
+                } else if(fs.exists(hfileStagingPath)){
+                  LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+                      hfileStagingPath);
+                }
+              } catch(Exception ex) {
+                LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+                    hfileStagingPath, ex);
+              }
+            }
+          }
+        }
       }
     };
 
@@ -626,11 +700,11 @@ public class LoadIncrementalHFiles exten
     }
 
     HTableDescriptor htd = new HTableDescriptor(tableName);
-    HColumnDescriptor hcd = null;
+    HColumnDescriptor hcd;
 
     // Add column families
     // Build a set of keys
-    byte[][] keys = null;
+    byte[][] keys;
     TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
     
     for (FileStatus stat : familyDirStatuses) {
@@ -667,10 +741,10 @@ public class LoadIncrementalHFiles exten
             " last="  + Bytes.toStringBinary(last));
           
           // To eventually infer start key-end key boundaries
-          Integer value = map.containsKey(first)?(Integer)map.get(first):0;
+          Integer value = map.containsKey(first)? map.get(first):0;
           map.put(first, value+1);
 
-          value = map.containsKey(last)?(Integer)map.get(last):0;
+          value = map.containsKey(last)? map.get(last):0;
           map.put(last, value-1);
         }  finally {
           reader.close();

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Wed Feb 13 20:58:23 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.Immuta
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Strings;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -219,7 +220,8 @@ extends InputFormat<ImmutableBytesWritab
   private String reverseDNS(InetAddress ipAddress) throws NamingException {
     String hostName = this.reverseDNSCacheMap.get(ipAddress);
     if (hostName == null) {
-      hostName = DNS.reverseDns(ipAddress, this.nameServer);
+      hostName = Strings.domainNamePointerToHostName(
+        DNS.reverseDns(ipAddress, this.nameServer));
       this.reverseDNSCacheMap.put(ipAddress, hostName);
     }
     return hostName;

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java Wed Feb 13 20:58:23 2013
@@ -126,7 +126,7 @@ public class WALPlayer extends Configure
           Delete del = null;
           KeyValue lastKV = null;
           for (KeyValue kv : value.getKeyValues()) {
-            // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
+            // filtering HLog meta entries
             if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
 
             // A WALEdit may contain multiple operations (HBASE-3584) and/or

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Wed Feb 13 20:58:23 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,7 +39,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import com.google.common.collect.LinkedHashMultimap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -85,6 +83,8 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.data.Stat;
 
+import com.google.common.collect.LinkedHashMultimap;
+
 /**
  * Manages and performs region assignment.
  * <p>
@@ -162,8 +162,10 @@ public class AssignmentManager extends Z
    * that ServerShutdownHandler can be fully enabled and re-assign regions
    * of dead servers. So that when re-assignment happens, AssignmentManager
    * has proper region states.
+   *
+   * Protected to ease testing.
    */
-  final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
+  protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
 
   /**
    * Constructs a new assignment manager.
@@ -610,7 +612,7 @@ public class AssignmentManager extends Z
    */
   private void handleRegion(final RegionTransition rt, int expectedVersion) {
     if (rt == null) {
-      LOG.warn("Unexpected NULL input " + rt);
+      LOG.warn("Unexpected NULL input for RegionTransition rt");
       return;
     }
     final ServerName sn = rt.getServerName();
@@ -1059,13 +1061,27 @@ public class AssignmentManager extends Z
               ZKUtil.listChildrenAndWatchForNewChildren(
                 watcher, watcher.assignmentZNode);
             if (children != null) {
+              Stat stat = new Stat();
               for (String child : children) {
                 // if region is in transition, we already have a watch
                 // on it, so no need to watch it again. So, as I know for now,
                 // this is needed to watch splitting nodes only.
                 if (!regionStates.isRegionInTransition(child)) {
-                  ZKUtil.watchAndCheckExists(watcher,
-                    ZKUtil.joinZNode(watcher.assignmentZNode, child));
+                  stat.setVersion(0);
+                  byte[] data = ZKAssign.getDataAndWatch(watcher,
+                    ZKUtil.joinZNode(watcher.assignmentZNode, child), stat);
+                  if (data != null && stat.getVersion() > 0) {
+                    try {
+                      RegionTransition rt = RegionTransition.parseFrom(data);
+
+                      //See HBASE-7551, handle splitting too, in case we miss the node change event
+                      if (rt.getEventType() == EventType.RS_ZK_REGION_SPLITTING) {
+                        handleRegion(rt, stat.getVersion());
+                      }
+                    } catch (DeserializationException de) {
+                      LOG.error("error getting data for " + child, de);
+                    }
+                  }
                 }
               }
             }
@@ -1461,6 +1477,7 @@ public class AssignmentManager extends Z
           return;
         }
         // This never happens. Currently regionserver close always return true.
+        // Todo; this can now happen (0.96) if there is an exception in a coprocessor
         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
           region.getRegionNameAsString());
       } catch (Throwable t) {
@@ -2633,11 +2650,11 @@ public class AssignmentManager extends Z
     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
   }
 
-  boolean isCarryingRoot(ServerName serverName) {
+  public boolean isCarryingRoot(ServerName serverName) {
     return isCarryingRegion(serverName, HRegionInfo.ROOT_REGIONINFO);
   }
 
-  boolean isCarryingMeta(ServerName serverName) {
+  public boolean isCarryingMeta(ServerName serverName) {
     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
   }
 

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java Wed Feb 13 20:58:23 2013
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.ServerNam
 
 /**
  * Run bulk assign.  Does one RCP per regionserver passing a
- * batch of regions using {@link SingleServerBulkAssigner}.
+ * batch of regions using {@link GeneralBulkAssigner.SingleServerBulkAssigner}.
  */
 @InterfaceAudience.Private
 public class GeneralBulkAssigner extends BulkAssigner {

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java Wed Feb 13 20:58:23 2013
@@ -64,6 +64,7 @@ public class MasterCoprocessorHost
   private MasterServices masterServices;
 
   MasterCoprocessorHost(final MasterServices services, final Configuration conf) {
+    this.conf = conf;
     this.masterServices = services;
     loadSystemCoprocessors(conf, MASTER_COPROCESSOR_CONF_KEY);
   }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java Wed Feb 13 20:58:23 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import com.google.protobuf.Service;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
@@ -80,6 +81,63 @@ public interface MasterServices extends 
       throws IOException;
 
   /**
+   * Delete a table
+   * @param tableName The table name
+   * @throws IOException
+   */
+  public void deleteTable(final byte[] tableName) throws IOException;
+
+  /**
+   * Modify the descriptor of an existing table
+   * @param tableName The table name
+   * @param descriptor The updated table descriptor
+   * @throws IOException
+   */
+  public void modifyTable(final byte[] tableName, final HTableDescriptor descriptor)
+      throws IOException;
+
+  /**
+   * Enable an existing table
+   * @param tableName The table name
+   * @throws IOException
+   */
+  public void enableTable(final byte[] tableName) throws IOException;
+
+  /**
+   * Disable an existing table
+   * @param tableName The table name
+   * @throws IOException
+   */
+  public void disableTable(final byte[] tableName) throws IOException;
+
+  /**
+   * Add a new column to an existing table
+   * @param tableName The table name
+   * @param column The column definition
+   * @throws IOException
+   */
+  public void addColumn(final byte[] tableName, final HColumnDescriptor column)
+      throws IOException;
+
+  /**
+   * Modify the column descriptor of an existing column in an existing table
+   * @param tableName The table name
+   * @param descriptor The updated column definition
+   * @throws IOException
+   */
+  public void modifyColumn(byte[] tableName, HColumnDescriptor descriptor)
+      throws IOException;
+
+  /**
+   * Delete a column from an existing table
+   * @param tableName The table name
+   * @param columnName The column name
+   * @throws IOException
+   */
+  public void deleteColumn(final byte[] tableName, final byte[] columnName)
+      throws IOException;
+
+  /**
    * @return Return table descriptors implementation.
    */
   public TableDescriptors getTableDescriptors();

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java Wed Feb 13 20:58:23 2013
@@ -117,6 +117,18 @@ public class RegionPlan implements Compa
   }
 
   @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    RegionPlan other = (RegionPlan) obj;
+    return compareTo(other) == 0;
+  }
+
+  @Override
   public String toString() {
     return "hri=" + this.hri.getRegionNameAsString() + ", src=" +
       (this.source == null? "": this.source.toString()) +

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Wed Feb 13 20:58:23 2013
@@ -380,7 +380,7 @@ public class ServerManager {
   public double getAverageLoad() {
     int totalLoad = 0;
     int numServers = 0;
-    double averageLoad = 0.0;
+    double averageLoad;
     for (ServerLoad sl: this.onlineServers.values()) {
         numServers++;
         totalLoad += sl.getNumberOfRegions();
@@ -680,7 +680,7 @@ public class ServerManager {
     */
   private AdminProtocol getServerConnection(final ServerName sn)
   throws IOException {
-    AdminProtocol admin = this.serverConnections.get(sn.toString());
+    AdminProtocol admin = this.serverConnections.get(sn);
     if (admin == null) {
       LOG.debug("New connection to " + sn.toString());
       admin = this.connection.getAdmin(sn.getHostname(), sn.getPort());
@@ -886,7 +886,7 @@ public class ServerManager {
    * To clear any dead server with same host name and port of any online server
    */
   void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
-    ServerName sn = null;
+    ServerName sn;
     for (ServerName serverName : getOnlineServersList()) {
       while ((sn = ServerName.
           findServerWithSameHostnamePort(this.deadservers, serverName)) != null) {

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Wed Feb 13 20:58:23 2013
@@ -26,6 +26,7 @@ import static org.apache.hadoop.hbase.ma
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -40,10 +41,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.Chore;
-import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.DeserializationException;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
@@ -123,6 +125,8 @@ public class SplitLogManager extends Zoo
   private volatile Set<ServerName> deadWorkers = null;
   private final Object deadWorkersLock = new Object();
 
+  private Set<String> failedDeletions = null;
+
   /**
    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
    *   Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf)}
@@ -180,6 +184,8 @@ public class SplitLogManager extends Zoo
     this.serverName = serverName;
     this.timeoutMonitor =
       new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
+
+    this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
   }
 
   public void finishInitialization(boolean masterRecovery) {
@@ -194,7 +200,7 @@ public class SplitLogManager extends Zoo
     }
   }
 
-  private FileStatus[] getFileList(List<Path> logDirs) throws IOException {
+  private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
     for (Path hLogDir : logDirs) {
       this.fs = hLogDir.getFileSystem(conf);
@@ -202,8 +208,7 @@ public class SplitLogManager extends Zoo
         LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
         continue;
       }
-      // TODO filter filenames?
-      FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, null);
+      FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
       if (logfiles == null || logfiles.length == 0) {
         LOG.info(hLogDir + " is empty dir, no logs to split");
       } else {
@@ -228,6 +233,7 @@ public class SplitLogManager extends Zoo
     logDirs.add(logDir);
     return splitLogDistributed(logDirs);
   }
+
   /**
    * The caller will block until all the log files of the given region server
    * have been processed - successfully split or an error is encountered - by an
@@ -239,9 +245,25 @@ public class SplitLogManager extends Zoo
    * @return cumulative size of the logfiles split
    */
   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
+    return splitLogDistributed(logDirs, null);
+  }
+
+  /**
+   * The caller will block until all the META log files of the given region server
+   * have been processed - successfully split or an error is encountered - by an
+   * available worker region server. This method must only be called after the
+   * region servers have been brought online.
+   *
+   * @param logDirs List of log dirs to split
+   * @param filter the Path filter to select specific files for considering
+   * @throws IOException If there was an error while splitting any log file
+   * @return cumulative size of the logfiles split
+   */
+  public long splitLogDistributed(final List<Path> logDirs, PathFilter filter) 
+      throws IOException {
     MonitoredTask status = TaskMonitor.get().createStatus(
           "Doing distributed log split in " + logDirs);
-    FileStatus[] logfiles = getFileList(logDirs);
+    FileStatus[] logfiles = getFileList(logDirs, filter);
     status.setStatus("Checking directory contents...");
     LOG.debug("Scheduling batch of logs to split");
     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
@@ -418,11 +440,12 @@ public class SplitLogManager extends Zoo
         }
       }
     }
-    // delete the task node in zk. Keep trying indefinitely - its an async
+    // delete the task node in zk. It's an async
     // call and no one is blocked waiting for this node to be deleted. All
     // task names are unique (log.<timestamp>) there is no risk of deleting
     // a future task.
-    deleteNode(path, Long.MAX_VALUE);
+    // if a deletion fails, TimeoutMonitor will retry the same deletion later
+    deleteNode(path, zkretries);
     return;
   }
 
@@ -531,6 +554,21 @@ public class SplitLogManager extends Zoo
     }
   }
 
+  /**
+   * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
+   * @param statusCode integer value of a ZooKeeper exception code
+   * @param action description message about the retried action
+   * @return true when need to abandon retries otherwise false
+   */
+  private boolean needAbandonRetries(int statusCode, String action) {
+    if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
+      LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
+          + "action=" + action);
+      return true;
+    }
+    return false;
+  }
+
   private void heartbeat(String path, int new_version, ServerName workerName) {
     Task task = findOrCreateOrphanTask(path);
     if (new_version != task.last_version) {
@@ -662,8 +700,7 @@ public class SplitLogManager extends Zoo
   }
 
   private void deleteNodeFailure(String path) {
-    LOG.fatal("logic failure, failing to delete a node should never happen " +
-        "because delete has infinite retries");
+    LOG.info("Failed to delete node " + path + " and will retry soon.");
     return;
   }
 
@@ -847,7 +884,7 @@ public class SplitLogManager extends Zoo
     volatile long last_update;
     volatile int last_version;
     volatile ServerName cur_worker_name;
-    TaskBatch batch;
+    volatile TaskBatch batch;
     volatile TerminationStatus status;
     volatile int incarnation;
     volatile int unforcedResubmits;
@@ -1005,6 +1042,16 @@ public class SplitLogManager extends Zoo
         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
         LOG.debug("resubmitting unassigned task(s) after timeout");
       }
+
+      // Retry previously failed deletes
+      if (failedDeletions.size() > 0) {
+        List<String> tmpPaths = new ArrayList<String>(failedDeletions);
+        for (String tmpPath : tmpPaths) {
+          // deleteNode is an async call
+          deleteNode(tmpPath, zkretries);
+        }
+        failedDeletions.removeAll(tmpPaths);
+      }
     }
   }
 
@@ -1019,6 +1066,10 @@ public class SplitLogManager extends Zoo
     public void processResult(int rc, String path, Object ctx, String name) {
       SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
       if (rc != 0) {
+        if (needAbandonRetries(rc, "Create znode " + path)) {
+          createNodeFailure(path);
+          return;
+        }
         if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
           // What if there is a delete pending against this pre-existing
           // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
@@ -1058,8 +1109,7 @@ public class SplitLogManager extends Zoo
         Stat stat) {
       SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
       if (rc != 0) {
-        if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) {
-          LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries.");
+        if (needAbandonRetries(rc, "GetData from znode " + path)) {
           return;
         }
         if (rc == KeeperException.Code.NONODE.intValue()) {
@@ -1113,6 +1163,10 @@ public class SplitLogManager extends Zoo
     public void processResult(int rc, String path, Object ctx) {
       SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
       if (rc != 0) {
+        if (needAbandonRetries(rc, "Delete znode " + path)) {
+          failedDeletions.add(path);
+          return;
+        }
         if (rc != KeeperException.Code.NONODE.intValue()) {
           SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
           Long retry_count = (Long) ctx;
@@ -1120,13 +1174,14 @@ public class SplitLogManager extends Zoo
               path + " remaining retries=" + retry_count);
           if (retry_count == 0) {
             LOG.warn("delete failed " + path);
+            failedDeletions.add(path);
             deleteNodeFailure(path);
           } else {
             deleteNode(path, retry_count - 1);
           }
           return;
         } else {
-        LOG.debug(path +
+          LOG.info(path +
             " does not exist. Either was created but deleted behind our" +
             " back by another pending delete OR was deleted" +
             " in earlier retry rounds. zkretries = " + (Long) ctx);
@@ -1151,8 +1206,7 @@ public class SplitLogManager extends Zoo
     @Override
     public void processResult(int rc, String path, Object ctx, String name) {
       if (rc != 0) {
-        if (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) {
-          LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries.");
+        if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
           return;
         }
         Long retry_count = (Long)ctx;

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java Wed Feb 13 20:58:23 2013
@@ -145,13 +145,23 @@ public abstract class CleanerChore<T ext
    * @return <tt>true</tt> if the directory was deleted, <tt>false</tt> otherwise.
    * @throws IOException if there is an unexpected filesystem error
    */
-  private boolean checkAndDeleteDirectory(Path toCheck) throws IOException {
+  public boolean checkAndDeleteDirectory(Path toCheck) throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Checking directory: " + toCheck);
     }
     FileStatus[] children = FSUtils.listStatus(fs, toCheck);
     // if the directory doesn't exist, then we are done
-    if (children == null) return true;
+    if (children == null) {
+      try {
+        return fs.delete(toCheck, false);
+      } catch (IOException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Couldn't delete directory: " + toCheck, e);
+        }
+      }
+      // couldn't delete w/o exception, so we can't return success.
+      return false;
+    }
 
     boolean canDeleteThis = true;
     for (FileStatus child : children) {
@@ -168,9 +178,22 @@ public abstract class CleanerChore<T ext
       }
     }
 
-    // if all the children have been deleted, then we should try to delete this directory. However,
-    // don't do so recursively so we don't delete files that have been added since we checked.
-    return canDeleteThis ? fs.delete(toCheck, false) : false;
+    // if the directory has children, we can't delete it, so we are done
+    if (!canDeleteThis) return false;
+
+    // otherwise, all the children (that we know about) have been deleted, so we should try to
+    // delete this directory. However, don't do so recursively so we don't delete files that have
+    // been added since we last checked.
+    try {
+      return fs.delete(toCheck, false);
+    } catch (IOException e) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Couldn't delete directory: " + toCheck, e);
+      }
+    }
+
+    // couldn't delete w/o exception, so we can't return success.
+    return false;
   }
 
   /**

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/TimeToLiveHFileCleaner.java Wed Feb 13 20:58:23 2013
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.util.Envi
 
 /**
  * HFile cleaner that uses the timestamp of the hfile to determine if it should be deleted. By
- * default they are allowed to live for {@value TimeToLiveHFileCleaner#DEFAULT_TTL}
+ * default they are allowed to live for {@value #DEFAULT_TTL}
  */
 @InterfaceAudience.Private
 public class TimeToLiveHFileCleaner extends BaseHFileCleanerDelegate {
@@ -38,7 +38,7 @@ public class TimeToLiveHFileCleaner exte
   public static final Log LOG = LogFactory.getLog(TimeToLiveHFileCleaner.class.getName());
   public static final String TTL_CONF_KEY = "hbase.master.hfilecleaner.ttl";
   // default ttl = 5 minutes
-  private static final long DEFAULT_TTL = 60000 * 5;
+  public static final long DEFAULT_TTL = 60000 * 5;
   // Configured time a hfile can be kept after it was moved to the archive
   private long ttl;
   private FileSystem fs;

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java Wed Feb 13 20:58:23 2013
@@ -97,7 +97,7 @@ public class DeleteTableHandler extends 
     FileSystem fs = mfs.getFileSystem();
     for (HRegionInfo hri: regions) {
       LOG.debug("Deleting region " + hri.getRegionNameAsString() + " from FS");
-      HFileArchiver.archiveRegion(fs, mfs.getRootDir(),
+      HFileArchiver.archiveRegion(masterServices.getConfiguration(), fs, mfs.getRootDir(),
           tempTableDir, new Path(tempTableDir, hri.getEncodedName()));
     }
 

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/MetaServerShutdownHandler.java Wed Feb 13 20:58:23 2013
@@ -18,11 +18,17 @@
  */
 package org.apache.hadoop.hbase.master.handler;
 
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.master.DeadServer;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Shutdown handler for the server hosting <code>-ROOT-</code>,
@@ -32,7 +38,7 @@ import org.apache.hadoop.hbase.master.Ma
 public class MetaServerShutdownHandler extends ServerShutdownHandler {
   private final boolean carryingRoot;
   private final boolean carryingMeta;
-
+  private static final Log LOG = LogFactory.getLog(MetaServerShutdownHandler.class);
   public MetaServerShutdownHandler(final Server server,
       final MasterServices services,
       final DeadServer deadServers, final ServerName serverName,
@@ -44,11 +50,118 @@ public class MetaServerShutdownHandler e
   }
 
   @Override
+  public void process() throws IOException {
+    try {
+      LOG.info("Splitting META logs for " + serverName);
+      if (this.shouldSplitHlog) {
+        this.services.getMasterFileSystem().splitMetaLog(serverName);
+      }
+    } catch (IOException ioe) {
+      this.services.getExecutorService().submit(this);
+      this.deadServers.add(serverName);
+      throw new IOException("failed log splitting for " +
+          serverName + ", will retry", ioe);
+    }
+
+    // Assign root and meta if we were carrying them.
+    if (isCarryingRoot()) { // -ROOT-
+      // Check again: region may be assigned to other where because of RIT
+      // timeout
+      if (this.services.getAssignmentManager().isCarryingRoot(serverName)) {
+        LOG.info("Server " + serverName
+            + " was carrying ROOT. Trying to assign.");
+        this.services.getAssignmentManager().regionOffline(
+            HRegionInfo.ROOT_REGIONINFO);
+        verifyAndAssignRootWithRetries();
+      } else {
+        LOG.info("ROOT has been assigned to otherwhere, skip assigning.");
+      }
+    }
+
+    // Carrying meta?
+    if (isCarryingMeta()) {
+      // Check again: region may be assigned to other where because of RIT
+      // timeout
+      if (this.services.getAssignmentManager().isCarryingMeta(serverName)) {
+        LOG.info("Server " + serverName
+            + " was carrying META. Trying to assign.");
+        this.services.getAssignmentManager().regionOffline(
+            HRegionInfo.FIRST_META_REGIONINFO);
+        this.services.getAssignmentManager().assignMeta();
+      } else {
+        LOG.info("META has been assigned to otherwhere, skip assigning.");
+      }
+
+    }
+    super.process();
+  }
+  /**
+   * Before assign the ROOT region, ensure it haven't
+   *  been assigned by other place
+   * <p>
+   * Under some scenarios, the ROOT region can be opened twice, so it seemed online
+   * in two regionserver at the same time.
+   * If the ROOT region has been assigned, so the operation can be canceled.
+   * @throws InterruptedException
+   * @throws IOException
+   * @throws KeeperException
+   */
+  private void verifyAndAssignRoot()
+  throws InterruptedException, IOException, KeeperException {
+    long timeout = this.server.getConfiguration().
+      getLong("hbase.catalog.verification.timeout", 1000);
+    if (!this.server.getCatalogTracker().verifyRootRegionLocation(timeout)) {
+      this.services.getAssignmentManager().assignRoot();
+    } else if (serverName.equals(server.getCatalogTracker().getRootLocation())) {
+      throw new IOException("-ROOT- is onlined on the dead server "
+          + serverName);
+    } else {
+      LOG.info("Skip assigning -ROOT-, because it is online on the "
+          + server.getCatalogTracker().getRootLocation());
+    }
+  }
+
+  /**
+   * Failed many times, shutdown processing
+   * @throws IOException
+   */
+  private void verifyAndAssignRootWithRetries() throws IOException {
+    int iTimes = this.server.getConfiguration().getInt(
+        "hbase.catalog.verification.retries", 10);
+
+    long waitTime = this.server.getConfiguration().getLong(
+        "hbase.catalog.verification.timeout", 1000);
+
+    int iFlag = 0;
+    while (true) {
+      try {
+        verifyAndAssignRoot();
+        break;
+      } catch (KeeperException e) {
+        this.server.abort("In server shutdown processing, assigning root", e);
+        throw new IOException("Aborting", e);
+      } catch (Exception e) {
+        if (iFlag >= iTimes) {
+          this.server.abort("verifyAndAssignRoot failed after" + iTimes
+              + " times retries, aborting", e);
+          throw new IOException("Aborting", e);
+        }
+        try {
+          Thread.sleep(waitTime);
+        } catch (InterruptedException e1) {
+          LOG.warn("Interrupted when is the thread sleep", e1);
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted", e1);
+        }
+        iFlag++;
+      }
+    }
+  }
+
   boolean isCarryingRoot() {
     return this.carryingRoot;
   }
 
-  @Override
   boolean isCarryingMeta() {
     return this.carryingMeta;
   }

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java Wed Feb 13 20:58:23 2013
@@ -55,10 +55,10 @@ import org.apache.zookeeper.KeeperExcept
 @InterfaceAudience.Private
 public class ServerShutdownHandler extends EventHandler {
   private static final Log LOG = LogFactory.getLog(ServerShutdownHandler.class);
-  private final ServerName serverName;
-  private final MasterServices services;
-  private final DeadServer deadServers;
-  private final boolean shouldSplitHlog; // whether to split HLog or not
+  protected final ServerName serverName;
+  protected final MasterServices services;
+  protected final DeadServer deadServers;
+  protected final boolean shouldSplitHlog; // whether to split HLog or not
 
   public ServerShutdownHandler(final Server server, final MasterServices services,
       final DeadServer deadServers, final ServerName serverName,
@@ -91,63 +91,6 @@ public class ServerShutdownHandler exten
   }
 
   /**
-   * Before assign the ROOT region, ensure it haven't
-   *  been assigned by other place
-   * <p>
-   * Under some scenarios, the ROOT region can be opened twice, so it seemed online
-   * in two regionserver at the same time.
-   * If the ROOT region has been assigned, so the operation can be canceled.
-   * @throws InterruptedException
-   * @throws IOException
-   * @throws KeeperException
-   */
-  private void verifyAndAssignRoot()
-  throws InterruptedException, IOException, KeeperException {
-    long timeout = this.server.getConfiguration().
-      getLong("hbase.catalog.verification.timeout", 1000);
-    if (!this.server.getCatalogTracker().verifyRootRegionLocation(timeout)) {
-      this.services.getAssignmentManager().assignRoot();
-    }
-  }
-
-  /**
-   * Failed many times, shutdown processing
-   * @throws IOException
-   */
-  private void verifyAndAssignRootWithRetries() throws IOException {
-    int iTimes = this.server.getConfiguration().getInt(
-        "hbase.catalog.verification.retries", 10);
-
-    long waitTime = this.server.getConfiguration().getLong(
-        "hbase.catalog.verification.timeout", 1000);
-
-    int iFlag = 0;
-    while (true) {
-      try {
-        verifyAndAssignRoot();
-        break;
-      } catch (KeeperException e) {
-        this.server.abort("In server shutdown processing, assigning root", e);
-        throw new IOException("Aborting", e);
-      } catch (Exception e) {
-        if (iFlag >= iTimes) {
-          this.server.abort("verifyAndAssignRoot failed after" + iTimes
-              + " times retries, aborting", e);
-          throw new IOException("Aborting", e);
-        }
-        try {
-          Thread.sleep(waitTime);
-        } catch (InterruptedException e1) {
-          LOG.warn("Interrupted when is the thread sleep", e1);
-          Thread.currentThread().interrupt();
-          throw new IOException("Interrupted", e1);
-        }
-        iFlag++;
-      }
-    }
-  }
-
-  /**
    * @return True if the server we are processing was carrying <code>-ROOT-</code>
    */
   boolean isCarryingRoot() {
@@ -182,30 +125,13 @@ public class ServerShutdownHandler exten
           LOG.info("Skipping log splitting for " + serverName);
         }
       } catch (IOException ioe) {
-        this.services.getExecutorService().submit(this);
+        //typecast to SSH so that we make sure that it is the SSH instance that
+        //gets submitted as opposed to MSSH or some other derived instance of SSH
+        this.services.getExecutorService().submit((ServerShutdownHandler)this);
         this.deadServers.add(serverName);
         throw new IOException("failed log splitting for " +
           serverName + ", will retry", ioe);
       }
-
-      // Assign root and meta if we were carrying them.
-      if (isCarryingRoot()) { // -ROOT-
-        LOG.info("Server " + serverName +
-            " was carrying ROOT. Trying to assign.");
-        this.services.getAssignmentManager().
-          regionOffline(HRegionInfo.ROOT_REGIONINFO);
-        verifyAndAssignRootWithRetries();
-      }
-
-      // Carrying meta?
-      if (isCarryingMeta()) {
-        LOG.info("Server " + serverName +
-          " was carrying META. Trying to assign.");
-        this.services.getAssignmentManager().
-          regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
-        this.services.getAssignmentManager().assignMeta();
-      }
-
       // We don't want worker thread in the MetaServerShutdownHandler
       // executor pool to block by waiting availability of -ROOT-
       // and .META. server. Otherwise, it could run into the following issue:
@@ -430,7 +356,7 @@ public class ServerShutdownHandler exten
     if (daughter == null) return 0;
     if (isDaughterMissing(catalogTracker, daughter)) {
       LOG.info("Fixup; missing daughter " + daughter.getRegionNameAsString());
-      MetaEditor.addDaughter(catalogTracker, daughter, null);
+      MetaEditor.addDaughter(catalogTracker, daughter, null, HConstants.NO_SEQNUM);
 
       // TODO: Log WARN if the regiondir does not exist in the fs.  If its not
       // there then something wonky about the split -- things will keep going

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/SplitRegionHandler.java Wed Feb 13 20:58:23 2013
@@ -46,6 +46,7 @@ public class SplitRegionHandler extends 
   /**
    * For testing only!  Set to true to skip handling of split.
    */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
   public static boolean TEST_SKIP = false;
 
   public SplitRegionHandler(Server server,

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java Wed Feb 13 20:58:23 2013
@@ -58,15 +58,13 @@ public class TableAddFamilyHandler exten
     if(cpHost != null){
       cpHost.preAddColumnHandler(this.tableName, this.familyDesc);
     }
-    // Update table descriptor in HDFS
-    HTableDescriptor htd = this.masterServices.getMasterFileSystem()
-        .addColumn(tableName, familyDesc);
-    // Update in-memory descriptor cache
-    this.masterServices.getTableDescriptors().add(htd);
+    // Update table descriptor
+    this.masterServices.getMasterFileSystem().addColumn(tableName, familyDesc);
     if(cpHost != null){
       cpHost.postAddColumnHandler(this.tableName, this.familyDesc);
     }
   }
+
   @Override
   public String toString() {
     String name = "UnknownServerName";

Modified: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java?rev=1445918&r1=1445917&r2=1445918&view=diff
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java (original)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java Wed Feb 13 20:58:23 2013
@@ -53,11 +53,8 @@ public class TableDeleteFamilyHandler ex
     if (cpHost != null) {
       cpHost.preDeleteColumnHandler(this.tableName, this.familyName);
     }
-    // Update table descriptor in HDFS
-    HTableDescriptor htd =
-      this.masterServices.getMasterFileSystem().deleteColumn(tableName, familyName);
-    // Update in-memory descriptor cache
-    this.masterServices.getTableDescriptors().add(htd);
+    // Update table descriptor
+    this.masterServices.getMasterFileSystem().deleteColumn(tableName, familyName);
     // Remove the column family from the file system
     MasterFileSystem mfs = this.masterServices.getMasterFileSystem();
     for (HRegionInfo hri : hris) {



Mime
View raw message