hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1176171 [3/9] - in /hbase/branches/0.89: ./ bin/ src/ src/assembly/ src/docs/src/documentation/content/xdocs/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/filter...
Date Tue, 27 Sep 2011 02:41:20 GMT
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java?rev=1176171&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/hadoopbackport/TotalOrderPartitioner.java Tue Sep 27 02:41:16 2011
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Partitioner effecting a total order by reading split points from
+ * an externally generated source.
+ *
+ * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner
+ * from Hadoop trunk at r910774.
+ */
+public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
+    extends Partitioner<K,V> implements Configurable {
+
+  private Node partitions;
+  public static final String DEFAULT_PATH = "_partition.lst";
+  public static final String PARTITIONER_PATH =
+    "mapreduce.totalorderpartitioner.path";
+  public static final String MAX_TRIE_DEPTH =
+    "mapreduce.totalorderpartitioner.trie.maxdepth";
+  public static final String NATURAL_ORDER =
+    "mapreduce.totalorderpartitioner.naturalorder";
+  Configuration conf;
+
+  public TotalOrderPartitioner() { }
+
+  /**
+   * Read in the partition file and build indexing data structures.
+   * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
+   * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
+   * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
+   * will be built. Otherwise, keys will be located using a binary search of
+   * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
+   * defined for this job. The input file must be sorted with the same
+   * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
+   */
+  @SuppressWarnings("unchecked") // keytype from conf not static
+  public void setConf(Configuration conf) {
+    try {
+      this.conf = conf;
+      String parts = getPartitionFile(conf);
+      final Path partFile = new Path(parts);
+      final FileSystem fs = (DEFAULT_PATH.equals(parts))
+        ? FileSystem.getLocal(conf)     // assume in DistributedCache
+        : partFile.getFileSystem(conf);
+
+      Job job = new Job(conf);
+      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
+      K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
+      if (splitPoints.length != job.getNumReduceTasks() - 1) {
+        throw new IOException("Wrong number of partitions in keyset:"
+            + splitPoints.length);
+      }
+      RawComparator<K> comparator =
+        (RawComparator<K>) job.getSortComparator();
+      for (int i = 0; i < splitPoints.length - 1; ++i) {
+        if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
+          throw new IOException("Split points are out of order");
+        }
+      }
+      boolean natOrder =
+        conf.getBoolean(NATURAL_ORDER, true);
+      if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
+        partitions = buildTrie((BinaryComparable[])splitPoints, 0,
+            splitPoints.length, new byte[0],
+            // Now that blocks of identical splitless trie nodes are
+            // represented reentrantly, and we develop a leaf for any trie
+            // node with only one split point, the only reason for a depth
+            // limit is to refute stack overflow or bloat in the pathological
+            // case where the split points are long and mostly look like bytes
+            // iii...iixii...iii   .  Therefore, we make the default depth
+            // limit large but not huge.
+            conf.getInt(MAX_TRIE_DEPTH, 200));
+      } else {
+        partitions = new BinarySearchNode(splitPoints, comparator);
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Can't read partitions file", e);
+    }
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  // by construction, we know if our keytype
+  @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
+  public int getPartition(K key, V value, int numPartitions) {
+    return partitions.findPartition(key);
+  }
+
+  /**
+   * Set the path to the SequenceFile storing the sorted partition keyset.
+   * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
+   * keys in the SequenceFile.
+   */
+  public static void setPartitionFile(Configuration conf, Path p) {
+    conf.set(PARTITIONER_PATH, p.toString());
+  }
+
+  /**
+   * Get the path to the SequenceFile storing the sorted partition keyset.
+   * @see #setPartitionFile(Configuration, Path)
+   */
+  public static String getPartitionFile(Configuration conf) {
+    return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
+  }
+
+  /**
+   * Interface to the partitioner to locate a key in the partition keyset.
+   */
+  interface Node<T> {
+    /**
+     * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
+     * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
+     */
+    int findPartition(T key);
+  }
+
+  /**
+   * Base class for trie nodes. If the keytype is memcomp-able, this builds
+   * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
+   * bytes.
+   */
+  static abstract class TrieNode implements Node<BinaryComparable> {
+    private final int level;
+    TrieNode(int level) {
+      this.level = level;
+    }
+    int getLevel() {
+      return level;
+    }
+  }
+
+  /**
+   * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
+   * where disabled by <tt>total.order.partitioner.natural.order</tt>,
+   * search the partition keyset with a binary search.
+   */
+  class BinarySearchNode implements Node<K> {
+    private final K[] splitPoints;
+    private final RawComparator<K> comparator;
+    BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
+      this.splitPoints = splitPoints;
+      this.comparator = comparator;
+    }
+    public int findPartition(K key) {
+      final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
+      return (pos < 0) ? -pos : pos;
+    }
+  }
+
+  /**
+   * An inner trie node that contains 256 children based on the next
+   * character.
+   */
+  class InnerTrieNode extends TrieNode {
+    private TrieNode[] child = new TrieNode[256];
+
+    InnerTrieNode(int level) {
+      super(level);
+    }
+    public int findPartition(BinaryComparable key) {
+      int level = getLevel();
+      if (key.getLength() <= level) {
+        return child[0].findPartition(key);
+      }
+      return child[0xFF & key.getBytes()[level]].findPartition(key);
+    }
+  }
+
+  /**
+   * @param level        the tree depth at this node
+   * @param splitPoints  the full split point vector, which holds
+   *                     the split point or points this leaf node
+   *                     should contain
+   * @param lower        first INcluded element of splitPoints
+   * @param upper        first EXcluded element of splitPoints
+   * @return  a leaf node.  They come in three kinds: no split points
+   *          [and the findParttion returns a canned index], one split
+   *          point [and we compare with a single comparand], or more
+   *          than one [and we do a binary search].  The last case is
+   *          rare.
+   */
+  private TrieNode LeafTrieNodeFactory
+             (int level, BinaryComparable[] splitPoints, int lower, int upper) {
+      switch (upper - lower) {
+      case 0:
+          return new UnsplitTrieNode(level, lower);
+
+      case 1:
+          return new SinglySplitTrieNode(level, splitPoints, lower);
+
+      default:
+          return new LeafTrieNode(level, splitPoints, lower, upper);
+      }
+  }
+
+  /**
+   * A leaf trie node that scans for the key between lower..upper.
+   *
+   * We don't generate many of these now, since we usually continue trie-ing
+   * when more than one split point remains at this level. and we make different
+   * objects for nodes with 0 or 1 split point.
+   */
+  private class LeafTrieNode extends TrieNode {
+    final int lower;
+    final int upper;
+    final BinaryComparable[] splitPoints;
+    LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
+      super(level);
+      this.lower = lower;
+      this.upper = upper;
+      this.splitPoints = splitPoints;
+    }
+    public int findPartition(BinaryComparable key) {
+      final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
+      return (pos < 0) ? -pos : pos;
+    }
+  }
+
+  private class UnsplitTrieNode extends TrieNode {
+      final int result;
+
+      UnsplitTrieNode(int level, int value) {
+          super(level);
+          this.result = value;
+      }
+
+      public int findPartition(BinaryComparable key) {
+          return result;
+      }
+  }
+
+  private class SinglySplitTrieNode extends TrieNode {
+      final int               lower;
+      final BinaryComparable  mySplitPoint;
+
+      SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
+          super(level);
+          this.lower = lower;
+          this.mySplitPoint = splitPoints[lower];
+      }
+
+      public int findPartition(BinaryComparable key) {
+          return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
+      }
+  }
+
+
+  /**
+   * Read the cut points from the given IFile.
+   * @param fs The file system
+   * @param p The path to read
+   * @param keyClass The map output key class
+   * @param job The job config
+   * @throws IOException
+   */
+                                 // matching key types enforced by passing in
+  @SuppressWarnings("unchecked") // map output key class
+  private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
+      Configuration conf) throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
+    ArrayList<K> parts = new ArrayList<K>();
+    K key = ReflectionUtils.newInstance(keyClass, conf);
+    NullWritable value = NullWritable.get();
+    while (reader.next(key, value)) {
+      parts.add(key);
+      key = ReflectionUtils.newInstance(keyClass, conf);
+    }
+    reader.close();
+    return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
+  }
+
+  /**
+   *
+   * This object contains a TrieNodeRef if there is such a thing that
+   * can be repeated.  Two adjacent trie node slots that contain no
+   * split points can be filled with the same trie node, even if they
+   * are not on the same level.  See buildTreeRec, below.
+   *
+   */
+  private class CarriedTrieNodeRef
+  {
+      TrieNode   content;
+
+      CarriedTrieNodeRef() {
+          content = null;
+      }
+  }
+
+
+  /**
+   * Given a sorted set of cut points, build a trie that will find the correct
+   * partition quickly.
+   * @param splits the list of cut points
+   * @param lower the lower bound of partitions 0..numPartitions-1
+   * @param upper the upper bound of partitions 0..numPartitions-1
+   * @param prefix the prefix that we have already checked against
+   * @param maxDepth the maximum depth we will build a trie for
+   * @return the trie node that will divide the splits correctly
+   */
+  private TrieNode buildTrie(BinaryComparable[] splits, int lower,
+          int upper, byte[] prefix, int maxDepth) {
+      return buildTrieRec
+               (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
+  }
+
+  /**
+   * This is the core of buildTrie.  The interface, and stub, above, just adds
+   * an empty CarriedTrieNodeRef.
+   *
+   * We build trie nodes in depth first order, which is also in key space
+   * order.  Every leaf node is referenced as a slot in a parent internal
+   * node.  If two adjacent slots [in the DFO] hold leaf nodes that have
+   * no split point, then they are not separated by a split point either,
+   * because there's no place in key space for that split point to exist.
+   *
+   * When that happens, the leaf nodes would be semantically identical, and
+   * we reuse the object.  A single CarriedTrieNodeRef "ref" lives for the
+   * duration of the tree-walk.  ref carries a potentially reusable, unsplit
+   * leaf node for such reuse until a leaf node with a split arises, which
+   * breaks the chain until we need to make a new unsplit leaf node.
+   *
+   * Note that this use of CarriedTrieNodeRef means that for internal nodes,
+   * for internal nodes if this code is modified in any way we still need
+   * to make or fill in the subnodes in key space order.
+   */
+  private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
+      int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
+    final int depth = prefix.length;
+    // We generate leaves for a single split point as well as for
+    // no split points.
+    if (depth >= maxDepth || lower >= upper - 1) {
+        // If we have two consecutive requests for an unsplit trie node, we
+        // can deliver the same one the second time.
+        if (lower == upper && ref.content != null) {
+            return ref.content;
+        }
+        TrieNode  result = LeafTrieNodeFactory(depth, splits, lower, upper);
+        ref.content = lower == upper ? result : null;
+        return result;
+    }
+    InnerTrieNode result = new InnerTrieNode(depth);
+    byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
+    // append an extra byte on to the prefix
+    int         currentBound = lower;
+    for(int ch = 0; ch < 0xFF; ++ch) {
+      trial[depth] = (byte) (ch + 1);
+      lower = currentBound;
+      while (currentBound < upper) {
+        if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
+          break;
+        }
+        currentBound += 1;
+      }
+      trial[depth] = (byte) ch;
+      result.child[0xFF & ch]
+                   = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
+    }
+    // pick up the rest
+    trial[depth] = (byte)0xFF;
+    result.child[0xFF]
+                 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
+
+    return result;
+  }
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/BaseScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/BaseScanner.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/BaseScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/BaseScanner.java Tue Sep 27 02:41:16 2011
@@ -526,32 +526,27 @@ abstract class BaseScanner extends Chore
    * @param regionServer
    * @param meta
    * @param info
-   * @param serverAddress
+   * @param hostnameAndPort hostname ':' port as it comes out of .META.
    * @param startCode
    * @throws IOException
    */
   protected void checkAssigned(final HRegionInterface regionServer,
     final MetaRegion meta, final HRegionInfo info,
-    final String serverAddress, final long startCode)
+    final String hostnameAndPort, final long startCode)
   throws IOException {
     String serverName = null;
-    String sa = serverAddress;
+    String sa = hostnameAndPort;
     long sc = startCode;
     if (sa == null || sa.length() <= 0) {
-      // Scans are sloppy.  They don't respect row locks and they get and
-      // cache a row internally so may have data that is a little stale.  Make
-      // sure that for sure this serverAddress is null.  We are trying to
-      // avoid double-assignments.  See hbase-1784.  Will have to wait till
-      // 0.21 hbase where we use zk to mediate state transitions to do better.
+      // Scans are sloppy.  They cache a row internally so may have data that
+      // is a little stale.  Make sure that for sure this serverAddress is null.
+      // We are trying to avoid double-assignments.  See hbase-1784.
       Get g = new Get(info.getRegionName());
       g.addFamily(HConstants.CATALOG_FAMILY);
       Result r = regionServer.get(meta.getRegionName(), g);
       if (r != null && !r.isEmpty()) {
         sa = getServerAddress(r);
-        if (sa != null && sa.length() > 0) {
-          // Reget startcode in case its changed in the meantime too.
-          sc = getStartCode(r);
-        }
+        sc = getStartCode(r);
       }
     }
     if (sa != null && sa.length() > 0) {
@@ -594,6 +589,7 @@ abstract class BaseScanner extends Chore
     synchronized(scannerLock){
       if (isAlive()) {
         super.interrupt();
+        LOG.info("Interrupted");
       }
     }
   }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Sep 27 02:41:16 2011
@@ -447,6 +447,9 @@ public class HMaster extends Thread impl
           if (this.serverManager.numServers() == 0) {
             startShutdown();
             break;
+          } else {
+            LOG.debug("Waiting on " +
+              this.serverManager.getServersToServerInfo().keySet().toString());
           }
         }
         final HServerAddress root = this.regionManager.getRootRegionLocation();
@@ -639,17 +642,18 @@ public class HMaster extends Thread impl
 
   public MapWritable regionServerStartup(final HServerInfo serverInfo)
   throws IOException {
-    // Set the address for now even tho it will not be persisted on HRS side
-    // If the address given is not the default one, use IP given by the user.
-    if (serverInfo.getServerAddress().getBindAddress().equals(DEFAULT_HOST)) {
-      String rsAddress = HBaseServer.getRemoteAddress();
-      serverInfo.setServerAddress(new HServerAddress(rsAddress,
-        serverInfo.getServerAddress().getPort()));
-    }
+    // Set the ip into the passed in serverInfo.  Its ip is more than likely
+    // not the ip that the master sees here.  See at end of this method where
+    // we pass it back to the regionserver by setting "hbase.regionserver.address"
+    String rsAddress = HBaseServer.getRemoteAddress();
+    serverInfo.setServerAddress(new HServerAddress(rsAddress,
+      serverInfo.getServerAddress().getPort()));
     // Register with server manager
     this.serverManager.regionServerStartup(serverInfo);
     // Send back some config info
-    return createConfigurationSubset();
+    MapWritable mw = createConfigurationSubset();
+     mw.put(new Text("hbase.regionserver.address"), new Text(rsAddress));
+    return mw;
   }
 
   /**
@@ -658,11 +662,6 @@ public class HMaster extends Thread impl
    */
   protected MapWritable createConfigurationSubset() {
     MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR);
-    // Get the real address of the HRS.
-    String rsAddress = HBaseServer.getRemoteAddress();
-    if (rsAddress != null) {
-      mw.put(new Text("hbase.regionserver.address"), new Text(rsAddress));
-    }
     return addConfig(mw, "fs.default.name");
   }
 
@@ -812,7 +811,7 @@ public class HMaster extends Thread impl
   }
 
   // TODO: Redo so this method does not duplicate code with subsequent methods.
-  private List<Pair<HRegionInfo,HServerAddress>> getTableRegions(
+  List<Pair<HRegionInfo,HServerAddress>> getTableRegions(
       final byte [] tableName)
   throws IOException {
     List<Pair<HRegionInfo,HServerAddress>> result =
@@ -838,7 +837,7 @@ public class HMaster extends Thread impl
               data.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER));
           if (Bytes.equals(info.getTableDesc().getName(), tableName)) {
             byte [] value = data.getValue(CATALOG_FAMILY, SERVER_QUALIFIER);
-            if (value != null) {
+            if (value != null && value.length > 0) {
               HServerAddress server = new HServerAddress(Bytes.toString(value));
               result.add(new Pair<HRegionInfo,HServerAddress>(info, server));
             }
@@ -853,7 +852,7 @@ public class HMaster extends Thread impl
     return result;
   }
 
-  private Pair<HRegionInfo,HServerAddress> getTableRegionClosest(
+  Pair<HRegionInfo,HServerAddress> getTableRegionClosest(
       final byte [] tableName, final byte [] rowKey)
   throws IOException {
     Set<MetaRegion> regions =
@@ -877,7 +876,7 @@ public class HMaster extends Thread impl
             if ((Bytes.compareTo(info.getStartKey(), rowKey) >= 0) &&
                 (Bytes.compareTo(info.getEndKey(), rowKey) < 0)) {
                 byte [] value = data.getValue(CATALOG_FAMILY, SERVER_QUALIFIER);
-                if (value != null) {
+                if (value != null && value.length > 0) {
                   HServerAddress server =
                     new HServerAddress(Bytes.toString(value));
                   return new Pair<HRegionInfo,HServerAddress>(info, server);
@@ -894,7 +893,7 @@ public class HMaster extends Thread impl
     return null;
   }
 
-  private Pair<HRegionInfo,HServerAddress> getTableRegionFromName(
+  Pair<HRegionInfo,HServerAddress> getTableRegionFromName(
       final byte [] regionName)
   throws IOException {
     byte [] tableName = HRegionInfo.parseRegionName(regionName)[0];
@@ -910,7 +909,7 @@ public class HMaster extends Thread impl
       HRegionInfo info = Writables.getHRegionInfo(
           data.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER));
       byte [] value = data.getValue(CATALOG_FAMILY, SERVER_QUALIFIER);
-      if(value != null) {
+      if(value != null && value.length > 0) {
         HServerAddress server =
           new HServerAddress(Bytes.toString(value));
         return new Pair<HRegionInfo,HServerAddress>(info, server);
@@ -993,29 +992,26 @@ public class HMaster extends Thread impl
       // Arguments are regionname and an optional server name.
       byte [] regionname = ((ImmutableBytesWritable)args[0]).get();
       LOG.debug("Attempting to close region: " + Bytes.toStringBinary(regionname));
-      String servername = null;
+      String hostnameAndPort = null;
       if (args.length == 2) {
-        servername = Bytes.toString(((ImmutableBytesWritable)args[1]).get());
+        hostnameAndPort = Bytes.toString(((ImmutableBytesWritable)args[1]).get());
       }
       // Need hri
       Result rr = getFromMETA(regionname, HConstants.CATALOG_FAMILY);
       HRegionInfo hri = getHRegionInfo(rr.getRow(), rr);
-      if (servername == null) {
+      if (hostnameAndPort == null) {
         // Get server from the .META. if it wasn't passed as argument
-        servername =
+        hostnameAndPort =
           Bytes.toString(rr.getValue(CATALOG_FAMILY, SERVER_QUALIFIER));
       }
       // Take region out of the intransistions in case it got stuck there doing
       // an open or whatever.
       this.regionManager.clearFromInTransition(regionname);
-      // If servername is still null, then none, exit.
-      if (servername == null) break;
-      // Need to make up a HServerInfo 'servername' for that is how
-      // items are keyed in regionmanager Maps.
-      HServerAddress addr = new HServerAddress(servername);
+      // If hostnameAndPort is still null, then none, exit.
+      if (hostnameAndPort == null) break;
       long startCode =
         Bytes.toLong(rr.getValue(CATALOG_FAMILY, STARTCODE_QUALIFIER));
-      String name = HServerInfo.getServerName(addr, startCode);
+      String name = HServerInfo.getServerName(hostnameAndPort, startCode);
       LOG.info("Marking " + hri.getRegionNameAsString() +
         " as closing on " + name + "; cleaning SERVER + STARTCODE; " +
           "master will tell regionserver to close region on next heartbeat");

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/OldLogsCleaner.java Tue Sep 27 02:41:16 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,8 +49,6 @@ public class OldLogsCleaner extends Chor
   private final Path oldLogDir;
   private final LogCleanerDelegate logCleaner;
   private final Configuration conf;
-  // We expect a file looking like hlog.dat.ts
-  private final Pattern pattern = Pattern.compile("\\d*\\.hlog\\.dat\\.\\d*");
 
   /**
    *
@@ -92,7 +91,7 @@ public class OldLogsCleaner extends Chor
       int nbDeletedLog = 0;
       for (FileStatus file : files) {
         Path filePath = file.getPath();
-        if (pattern.matcher(filePath.getName()).matches()) {
+        if (HLog.validateHLogFilename(filePath.getName())) {
           if (logCleaner.isLogDeletable(filePath) ) {
             this.fs.delete(filePath, true);
             nbDeletedLog++;

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java Tue Sep 27 02:41:16 2011
@@ -69,18 +69,18 @@ class ProcessRegionOpen extends ProcessR
     HRegionInterface server =
         master.getServerConnection().getHRegionConnection(getMetaRegion().getServer());
     LOG.info(regionInfo.getRegionNameAsString() + " open on " +
-        serverInfo.getServerAddress().toString());
+      serverInfo.getServerName());
 
     // Register the newly-available Region's location.
     Put p = new Put(regionInfo.getRegionName());
     p.add(CATALOG_FAMILY, SERVER_QUALIFIER,
-      Bytes.toBytes(serverInfo.getServerAddress().toString()));
+      Bytes.toBytes(serverInfo.getHostnamePort()));
     p.add(CATALOG_FAMILY, STARTCODE_QUALIFIER,
       Bytes.toBytes(serverInfo.getStartCode()));
     server.put(metaRegionName, p);
     LOG.info("Updated row " + regionInfo.getRegionNameAsString() +
       " in region " + Bytes.toString(metaRegionName) + " with startcode=" +
-      serverInfo.getStartCode() + ", server=" + serverInfo.getServerAddress());
+      serverInfo.getStartCode() + ", server=" + serverInfo.getHostnamePort());
     synchronized (master.getRegionManager()) {
       if (isMetaTable) {
         // It's a meta region.

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Sep 27 02:41:16 2011
@@ -592,17 +592,8 @@ public class RegionManager implements HC
    * regions can shut down.
    */
   public void stopScanners() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("telling root scanner to stop");
-    }
-    rootScannerThread.interruptAndStop();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("telling meta scanner to stop");
-    }
-    metaScannerThread.interruptAndStop();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("meta and root scanners notified");
-    }
+    this.rootScannerThread.interruptAndStop();
+    this.metaScannerThread.interruptAndStop();
   }
 
   /** Stop the region assigner */
@@ -1152,7 +1143,8 @@ public class RegionManager implements HC
    */
   public void waitForRootRegionLocation() {
     synchronized (rootRegionLocation) {
-      while (!master.isClosed() && rootRegionLocation.get() == null) {
+      while (!master.getShutdownRequested().get() &&
+          !master.isClosed() && rootRegionLocation.get() == null) {
         // rootRegionLocation will be filled in when we get an 'open region'
         // regionServerReport message from the HRegionServer that has been
         // allocated the ROOT region below.

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue Sep 27 02:41:16 2011
@@ -67,8 +67,6 @@ public class ServerManager implements HC
   // The map of known server names to server info
   private final Map<String, HServerInfo> serversToServerInfo =
     new ConcurrentHashMap<String, HServerInfo>();
-  private final Map<HServerAddress, HServerInfo> serverAddressToServerInfo =
-      new ConcurrentHashMap<HServerAddress, HServerInfo>();
 
   /*
    * Set of known dead servers.  On znode expiration, servers are added here.
@@ -111,7 +109,7 @@ public class ServerManager implements HC
 
     @Override
     protected void chore() {
-      int numServers = serverAddressToServerInfo.size();
+      int numServers = serversToServerInfo.size();
       int numDeadServers = deadServers.size();
       double averageLoad = getAverageLoad();
       String deadServersList = null;
@@ -177,8 +175,7 @@ public class ServerManager implements HC
     // for processing by ProcessServerShutdown.
     HServerInfo info = new HServerInfo(serverInfo);
     String hostAndPort = info.getServerAddress().toString();
-    HServerInfo existingServer =
-      this.serverAddressToServerInfo.get(info.getServerAddress());
+    HServerInfo existingServer = haveServerWithSameHostAndPortAlready(info.getHostnamePort());
     if (existingServer != null) {
       LOG.info("Server start rejected; we already have " + hostAndPort +
         " registered; existingServer=" + existingServer + ", newServer=" + info);
@@ -193,6 +190,17 @@ public class ServerManager implements HC
     recordNewServer(info);
   }
 
+  private HServerInfo haveServerWithSameHostAndPortAlready(final String hostnamePort) {
+    synchronized (this.serversToServerInfo) {
+      for (Map.Entry<String, HServerInfo> e: this.serversToServerInfo.entrySet()) {
+        if (e.getValue().getHostnamePort().equals(hostnamePort)) {
+          return e.getValue();
+        }
+      }
+    }
+    return null;
+  }
+
   /*
    * If this server is on the dead list, reject it with a LeaseStillHeldException
    * @param serverName Server name formatted as host_port_startcode.
@@ -230,7 +238,6 @@ public class ServerManager implements HC
     Watcher watcher = new ServerExpirer(new HServerInfo(info));
     this.master.getZooKeeperWrapper().updateRSLocationGetWatch(info, watcher);
     this.serversToServerInfo.put(serverName, info);
-    this.serverAddressToServerInfo.put(info.getServerAddress(), info);
     this.serversToLoad.put(serverName, load);
     synchronized (this.loadToServers) {
       Set<String> servers = this.loadToServers.get(load);
@@ -297,10 +304,10 @@ public class ServerManager implements HC
     HServerInfo storedInfo = this.serversToServerInfo.get(info.getServerName());
     if (storedInfo == null) {
       LOG.warn("Received report from unknown server -- telling it " +
-        "to " + HMsg.CALL_SERVER_STARTUP + ": " + info.getServerName());
+        "to " + HMsg.REGIONSERVER_STOP + ": " + info.getServerName());
       // The HBaseMaster may have been restarted.
-      // Tell the RegionServer to start over and call regionServerStartup()
-      return new HMsg[] {HMsg.CALL_SERVER_STARTUP};
+      // Tell the RegionServer to abort!
+      return new HMsg[] {HMsg.REGIONSERVER_STOP};
     } else if (storedInfo.getStartCode() != info.getStartCode()) {
       // This state is reachable if:
       //
@@ -317,7 +324,7 @@ public class ServerManager implements HC
       }
 
       synchronized (this.serversToServerInfo) {
-        removeServerInfo(info.getServerName(), info.getServerAddress());
+        removeServerInfo(info.getServerName());
         notifyServers();
       }
 
@@ -339,7 +346,7 @@ public class ServerManager implements HC
     synchronized (this.serversToServerInfo) {
       // This method removes ROOT/META from the list and marks them to be
       // reassigned in addition to other housework.
-      if (removeServerInfo(serverInfo.getServerName(), serverInfo.getServerAddress())) {
+      if (removeServerInfo(serverInfo.getServerName())) {
         // Only process the exit message if the server still has registered info.
         // Otherwise we could end up processing the server exit twice.
         LOG.info("Region server " + serverInfo.getServerName() +
@@ -391,7 +398,6 @@ public class ServerManager implements HC
       final HRegionInfo[] mostLoadedRegions, HMsg[] msgs)
   throws IOException {
     // Refresh the info object and the load information
-    this.serverAddressToServerInfo.put(serverInfo.getServerAddress(), serverInfo);
     this.serversToServerInfo.put(serverInfo.getServerName(), serverInfo);
     HServerLoad load = this.serversToLoad.get(serverInfo.getServerName());
     if (load != null) {
@@ -659,10 +665,8 @@ public class ServerManager implements HC
   }
 
   /** Update a server load information because it's shutting down*/
-  private boolean removeServerInfo(final String serverName,
-      final HServerAddress serverAddress) {
+  private boolean removeServerInfo(final String serverName) {
     boolean infoUpdated = false;
-    this.serverAddressToServerInfo.remove(serverAddress);
     HServerInfo info = this.serversToServerInfo.remove(serverName);
     // Only update load information once.
     // This method can be called a couple of times during shutdown.
@@ -746,11 +750,19 @@ public class ServerManager implements HC
     }
   }
 
-  public Map<HServerAddress, HServerInfo> getServerAddressToServerInfo() {
-    // we use this one because all the puts to this map are parallel/synced with the other map.
-    synchronized (this.serversToServerInfo) {
-      return Collections.unmodifiableMap(this.serverAddressToServerInfo);
+  /**
+   * @param hsa
+   * @return The HServerInfo whose HServerAddress is <code>hsa</code> or null
+   * if nothing found.
+   */
+  public HServerInfo getHServerInfo(final HServerAddress hsa) {
+    synchronized(this.serversToServerInfo) {
+      // TODO: This is primitive.  Do a better search.
+      for (Map.Entry<String, HServerInfo> e: this.serversToServerInfo.entrySet()) {
+        if (e.getValue().getServerAddress().equals(hsa)) return e.getValue();
+      }
     }
+    return null;
   }
 
   /**
@@ -841,7 +853,6 @@ public class ServerManager implements HC
       return;
     }
     // Remove the server from the known servers lists and update load info
-    this.serverAddressToServerInfo.remove(info.getServerAddress());
     this.serversToServerInfo.remove(serverName);
     HServerLoad load = this.serversToLoad.remove(serverName);
     if (load != null) {
@@ -889,16 +900,8 @@ public class ServerManager implements HC
   }
 
   static boolean isDead(final Set<String> deadServers,
-    final String serverName, final boolean hostAndPortOnly) {
-    if (!hostAndPortOnly) return deadServers.contains(serverName);
-    String serverNameColonReplaced =
-      serverName.replaceFirst(":", HServerInfo.SERVERNAME_SEPARATOR);
-    for (String hostPortStartCode: deadServers) {
-      int index = hostPortStartCode.lastIndexOf(HServerInfo.SERVERNAME_SEPARATOR);
-      String hostPortStrippedOfStartCode = hostPortStartCode.substring(0, index);
-      if (hostPortStrippedOfStartCode.equals(serverNameColonReplaced)) return true;
-    }
-    return false;
+      final String serverName, final boolean hostAndPortOnly) {
+    return HServerInfo.isServer(deadServers, serverName, hostAndPortOnly);
   }
 
   Set<String> getDeadServers() {

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java Tue Sep 27 02:41:16 2011
@@ -40,12 +40,13 @@ public class TimeToLiveLogCleaner implem
   public boolean isLogDeletable(Path filePath) {
     long time = 0;
     long currentTime = System.currentTimeMillis();
-    System.out.println(filePath.getName());
     String[] parts = filePath.getName().split("\\.");
     try {
-      time = Long.parseLong(parts[3]);
+      time = Long.parseLong(parts[parts.length-1]);
     } catch (NumberFormatException e) {
-      e.printStackTrace();
+      LOG.error("Unable to parse the timestamp in " + filePath.getName() +
+          ", deleting it since it's invalid and may not be a hlog", e);
+      return true;
     }
     long life = currentTime - time;
     if (life < 0) {

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Sep 27 02:41:16 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.client.De
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowLock;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
@@ -50,8 +52,10 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -225,12 +229,6 @@ public class HRegion implements HConstan
   public final static String REGIONINFO_FILE = ".regioninfo";
 
   /**
-   * REGIONINFO_FILE as byte array.
-   */
-  public final static byte [] REGIONINFO_FILE_BYTES =
-    Bytes.toBytes(REGIONINFO_FILE);
-
-  /**
    * Should only be used for testing purposes
    */
   public HRegion(){
@@ -283,12 +281,11 @@ public class HRegion implements HConstan
     this.regionInfo = regionInfo;
     this.flushListener = flushListener;
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
-    String encodedNameStr = Integer.toString(this.regionInfo.getEncodedName());
+    String encodedNameStr = this.regionInfo.getEncodedName();
     this.regiondir = new Path(basedir, encodedNameStr);
     if (LOG.isDebugEnabled()) {
       // Write out region name as string and its encoded name.
-      LOG.debug("Creating region " + this + ", encoded=" +
-        this.regionInfo.getEncodedName());
+      LOG.debug("Creating region " + this);
     }
     this.regionCompactionDir =
       new Path(getCompactionDir(basedir), encodedNameStr);
@@ -362,9 +359,9 @@ public class HRegion implements HConstan
 
     // HRegion is ready to go!
     this.writestate.compacting = false;
-    this.lastFlushTime = System.currentTimeMillis();
-    LOG.info("region " + this + "/" + this.regionInfo.getEncodedName() +
-      " available; sequence id is " + this.minSequenceId);
+    this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
+    LOG.info("region " + this +
+             " available; sequence id is " + this.minSequenceId);
   }
 
   /*
@@ -385,11 +382,10 @@ public class HRegion implements HConstan
    * @return True if this region has references.
    */
   boolean hasReferences() {
-    for (Map.Entry<byte [], Store> e: this.stores.entrySet()) {
-      for (Map.Entry<Long, StoreFile> ee:
-          e.getValue().getStorefiles().entrySet()) {
+    for (Store store : this.stores.values()) {
+      for (StoreFile sf : store.getStorefiles()) {
         // Found a reference, return.
-        if (ee.getValue().isReference()) return true;
+        if (sf.isReference()) return true;
       }
     }
     return false;
@@ -670,7 +666,7 @@ public class HRegion implements HConstan
       }
       // Calculate regionid to use.  Can't be less than that of parent else
       // it'll insert into wrong location over in .META. table: HBASE-710.
-      long rid = System.currentTimeMillis();
+      long rid = EnvironmentEdgeManager.currentTimeMillis();
       if (rid < this.regionInfo.getRegionId()) {
         LOG.warn("Clock skew; parent regions id is " +
           this.regionInfo.getRegionId() + " but current time here is " + rid);
@@ -728,7 +724,7 @@ public class HRegion implements HConstan
   private Path getSplitDirForDaughter(final Path splits, final HRegionInfo hri)
   throws IOException {
     Path d =
-      new Path(splits, Integer.toString(hri.getEncodedName()));
+      new Path(splits, hri.getEncodedName());
     if (fs.exists(d)) {
       // This should never happen; the splits dir will be newly made when we
       // come in here.  Even if we crashed midway through a split, the reopen
@@ -835,7 +831,7 @@ public class HRegion implements HConstan
         }
         LOG.info("Starting" + (majorCompaction? " major " : " ") +
             "compaction on region " + this);
-        long startTime = System.currentTimeMillis();
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         doRegionCompactionPrep();
         long maxSize = -1;
         for (Store store: stores.values()) {
@@ -846,7 +842,7 @@ public class HRegion implements HConstan
           }
         }
         doRegionCompactionCleanup();
-        String timeTaken = StringUtils.formatTimeDiff(System.currentTimeMillis(),
+        String timeTaken = StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(),
             startTime);
         LOG.info("compaction completed on region " + this + " in " + timeTaken);
       } finally {
@@ -948,7 +944,7 @@ public class HRegion implements HConstan
    * because a Snapshot was not properly persisted.
    */
   protected boolean internalFlushcache() throws IOException {
-    final long startTime = System.currentTimeMillis();
+    final long startTime = EnvironmentEdgeManager.currentTimeMillis();
     // Clear flush flag.
     // Record latest flush time
     this.lastFlushTime = startTime;
@@ -986,13 +982,6 @@ public class HRegion implements HConstan
         storeFlushers.add(s.getStoreFlusher(completeSequenceId));
       }
 
-      // This thread is going to cause a whole bunch of scanners to reseek.
-      // They are depending
-      // on a thread-local to know where to read from.
-      // The reason why we set it up high is so that each HRegionScanner only
-      // has a single read point for all its sub-StoreScanners.
-      ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
-
       // prepare flush (take a snapshot)
       for (StoreFlusher flusher : storeFlushers) {
         flusher.prepare();
@@ -1030,12 +1019,9 @@ public class HRegion implements HConstan
       }
 
       try {
-        // update this again to make sure we are 'fresh'
-        ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
-
-        if (atomicWork != null) {
-          atomicWork.call();
-        }
+	if (atomicWork != null) {
+	  atomicWork.call();
+	}
 
         // Switch snapshot (in memstore) -> new hfile (thus causing
         // all the store scanners to reset/reseek).
@@ -1087,7 +1073,7 @@ public class HRegion implements HConstan
     }
 
     if (LOG.isDebugEnabled()) {
-      long now = System.currentTimeMillis();
+      long now = EnvironmentEdgeManager.currentTimeMillis();
       LOG.debug("Finished memstore flush of ~" +
         StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
         this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId +
@@ -1192,7 +1178,7 @@ public class HRegion implements HConstan
     newScannerLock.readLock().lock();
     try {
       if (this.closed.get()) {
-        throw new IOException("Region " + this + " closed");
+        throw new NotServingRegionException("Region " + this + " closed");
       }
       // Verify families are all valid
       if(scan.hasFamilies()) {
@@ -1215,6 +1201,26 @@ public class HRegion implements HConstan
     return new RegionScanner(scan, additionalScanners);
   }
 
+  /*
+   * @param delete The passed delete is modified by this method. WARNING!
+   */
+  private void prepareDelete(Delete delete) throws IOException {
+    // Check to see if this is a deleteRow insert
+    if(delete.getFamilyMap().isEmpty()){
+      for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
+        // Don't eat the timestamp
+        delete.deleteFamily(family, delete.getTimeStamp());
+      }
+    } else {
+      for(byte [] family : delete.getFamilyMap().keySet()) {
+        if(family == null) {
+          throw new NoSuchColumnFamilyException("Empty family is invalid");
+        }
+        checkFamily(family);
+      }
+    }
+  }
+
   //////////////////////////////////////////////////////////////////////////////
   // set() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
@@ -1235,22 +1241,8 @@ public class HRegion implements HConstan
       // If we did not pass an existing row lock, obtain a new one
       lid = getLock(lockid, row);
 
-      // Check to see if this is a deleteRow insert
-      if(delete.getFamilyMap().isEmpty()){
-        for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
-          // Don't eat the timestamp
-          delete.deleteFamily(family, delete.getTimeStamp());
-        }
-      } else {
-        for(byte [] family : delete.getFamilyMap().keySet()) {
-          if(family == null) {
-            throw new NoSuchColumnFamilyException("Empty family is invalid");
-          }
-          checkFamily(family);
-        }
-      }
-
       // All edits for the given row (across all column families) must happen atomically.
+      prepareDelete(delete);
       delete(delete.getFamilyMap(), writeToWAL);
 
     } finally {
@@ -1267,7 +1259,7 @@ public class HRegion implements HConstan
    */
   public void delete(Map<byte[], List<KeyValue>> familyMap, boolean writeToWAL)
   throws IOException {
-    long now = System.currentTimeMillis();
+    long now = EnvironmentEdgeManager.currentTimeMillis();
     byte [] byteNow = Bytes.toBytes(now);
     boolean flush = false;
 
@@ -1428,7 +1420,6 @@ public class HRegion implements HConstan
       // If we did not pass an existing row lock, obtain a new one
       Integer lid = getLock(lockid, row);
 
-      byte [] now = Bytes.toBytes(System.currentTimeMillis());
       try {
         // All edits for the given row (across all column families) must happen atomically.
         put(put.getFamilyMap(), writeToWAL);
@@ -1443,7 +1434,7 @@ public class HRegion implements HConstan
 
   //TODO, Think that gets/puts and deletes should be refactored a bit so that
   //the getting of the lock happens before, so that you would just pass it into
-  //the methods. So in the case of checkAndPut you could just do lockRow,
+  //the methods. So in the case of checkAndMutate you could just do lockRow,
   //get, put, unlockRow or something
   /**
    *
@@ -1451,27 +1442,29 @@ public class HRegion implements HConstan
    * @param family
    * @param qualifier
    * @param expectedValue
-   * @param put
    * @param lockId
    * @param writeToWAL
    * @throws IOException
    * @return true if the new put was execute, false otherwise
    */
-  public boolean checkAndPut(byte [] row, byte [] family, byte [] qualifier,
-      byte [] expectedValue, Put put, Integer lockId, boolean writeToWAL)
+  public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
+      byte [] expectedValue, Writable w, Integer lockId, boolean writeToWAL)
   throws IOException{
     checkReadOnly();
     //TODO, add check for value length or maybe even better move this to the
     //client if this becomes a global setting
     checkResources();
+    boolean isPut = w instanceof Put;
+    if (!isPut && !(w instanceof Delete))
+      throw new IOException("Action must be Put or Delete");
+
     splitsAndClosesLock.readLock().lock();
     try {
-      Get get = new Get(row, put.getRowLock());
+      RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock();
+      Get get = new Get(row, lock);
       checkFamily(family);
       get.addColumn(family, qualifier);
 
-      byte [] now = Bytes.toBytes(System.currentTimeMillis());
-
       // Lock row
       Integer lid = getLock(lockId, get.getRow());
       List<KeyValue> result = new ArrayList<KeyValue>();
@@ -1486,10 +1479,16 @@ public class HRegion implements HConstan
           byte [] actualValue = result.get(0).getValue();
           matches = Bytes.equals(expectedValue, actualValue);
         }
-        //If matches put the new put
+        //If matches put the new put or delete the new delete
         if (matches) {
           // All edits for the given row (across all column families) must happen atomically.
-          put(put.getFamilyMap(), writeToWAL);
+          if (isPut) {
+            put(((Put)w).getFamilyMap(), writeToWAL);
+          } else {
+            Delete d = (Delete)w;
+            prepareDelete(d);
+            delete(d.getFamilyMap(), writeToWAL);
+          }
           return true;
         }
         return false;
@@ -1511,18 +1510,15 @@ public class HRegion implements HConstan
    * @return <code>true</code> when updating the time stamp completed.
    */
   private boolean updateKeys(List<KeyValue> keys, byte [] now) {
-    if(keys == null || keys.isEmpty()) {
+    if (keys == null || keys.isEmpty()) {
       return false;
     }
-    for(KeyValue key : keys) {
-      if(key.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
-        key.updateLatestStamp(now);
-      }
+    for (KeyValue key : keys) {
+      key.updateLatestStamp(now);
     }
     return true;
   }
 
-
 //  /*
 //   * Utility method to verify values length.
 //   * @param batchUpdate The update to verify
@@ -1618,7 +1614,7 @@ public class HRegion implements HConstan
    */
   private void put(final Map<byte [], List<KeyValue>> familyMap,
       boolean writeToWAL) throws IOException {
-    long now = System.currentTimeMillis();
+    long now = EnvironmentEdgeManager.currentTimeMillis();
     byte[] byteNow = Bytes.toBytes(now);
     boolean flush = false;
     this.updatesLock.readLock().lock();
@@ -1891,6 +1887,23 @@ public class HRegion implements HConstan
     }
   }
 
+  public void bulkLoadHFile(String hfilePath, byte[] familyName)
+  throws IOException {
+    splitsAndClosesLock.readLock().lock();
+    try {
+      Store store = getStore(familyName);
+      if (store == null) {
+        throw new DoNotRetryIOException(
+            "No such column family " + Bytes.toStringBinary(familyName));
+      }
+      store.bulkLoadHFile(hfilePath);
+    } finally {
+      splitsAndClosesLock.readLock().unlock();
+    }
+
+  }
+
+
   @Override
   public boolean equals(Object o) {
     if (!(o instanceof HRegion)) {
@@ -1925,14 +1938,12 @@ public class HRegion implements HConstan
     private final byte [] stopRow;
     private Filter filter;
     private List<KeyValue> results = new ArrayList<KeyValue>();
-    private int isScan;
     private int batch;
-    // Doesn't need to be volatile, always accessed under a sync'ed method
+    private int isScan;
     private boolean filterClosed = false;
-    private Scan theScan = null;
-    private List<KeyValueScanner> extraScanners = null;
+    private long readPt;
 
-    RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
+    RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
       //DebugPrint.println("HRegionScanner.<init>");
       this.filter = scan.getFilter();
       this.batch = scan.getBatch();
@@ -1944,35 +1955,33 @@ public class HRegion implements HConstan
       // If we are doing a get, we want to be [startRow,endRow] normally
       // it is [startRow,endRow) and if startRow=endRow we get nothing.
       this.isScan = scan.isGetScan() ? -1 : 0;
-      this.theScan = scan;
-      this.extraScanners = additionalScanners;
-    }
 
-    RegionScanner(Scan scan) {
-      this(scan, null);
-    }
+      this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
 
-    void initHeap() {
       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
-      if (extraScanners != null) {
-        scanners.addAll(extraScanners);
+      if (additionalScanners != null) {
+        scanners.addAll(additionalScanners);
       }
 
       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
-          theScan.getFamilyMap().entrySet()) {
+          scan.getFamilyMap().entrySet()) {
         Store store = stores.get(entry.getKey());
-        scanners.add(store.getScanner(theScan, entry.getValue()));
+        scanners.add(store.getScanner(scan, entry.getValue()));
       }
       this.storeHeap = new KeyValueHeap(scanners, comparator);
     }
 
-    private void resetFilters() {
+    RegionScanner(Scan scan) throws IOException {
+      this(scan, null);
+    }
+
+    /**
+     * Reset both the filter and the old filter.
+     */
+    protected void resetFilters() {
       if (filter != null) {
         filter.reset();
       }
-
-      // Start the next row read and reset the thread point
-      ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
     }
 
     public synchronized boolean next(List<KeyValue> outResults, int limit)
@@ -1989,11 +1998,7 @@ public class HRegion implements HConstan
       }
 
       // This could be a new thread from the last time we called next().
-      ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
-      // lazy init the store heap.
-      if (storeHeap == null) {
-        initHeap();
-      }
+      ReadWriteConsistencyControl.setThreadReadPoint(this.readPt);
 
       results.clear();
       boolean returnResult = nextInternal(limit);
@@ -2036,7 +2041,7 @@ public class HRegion implements HConstan
         } else {
           byte [] nextRow;
           do {
-            this.storeHeap.next(results, limit);
+            this.storeHeap.next(results, limit - results.size());
             if (limit > 0 && results.size() == limit) {
               if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException(
                   "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
@@ -2230,7 +2235,7 @@ public class HRegion implements HConstan
     try {
       List<KeyValue> edits = new ArrayList<KeyValue>();
       edits.add(new KeyValue(row, CATALOG_FAMILY, REGIONINFO_QUALIFIER,
-          System.currentTimeMillis(), Writables.getBytes(r.getRegionInfo())));
+          EnvironmentEdgeManager.currentTimeMillis(), Writables.getBytes(r.getRegionInfo())));
       meta.put(HConstants.CATALOG_FAMILY, edits);
     } finally {
       meta.releaseRowLock(lid);
@@ -2322,8 +2327,8 @@ public class HRegion implements HConstan
    * @param name ENCODED region name
    * @return Path of HRegion directory
    */
-  public static Path getRegionDir(final Path tabledir, final int name) {
-    return new Path(tabledir, Integer.toString(name));
+  public static Path getRegionDir(final Path tabledir, final String name) {
+    return new Path(tabledir, name);
   }
 
   /**
@@ -2336,7 +2341,7 @@ public class HRegion implements HConstan
   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
     return new Path(
       HTableDescriptor.getTableDir(rootdir, info.getTableDesc().getName()),
-      Integer.toString(info.getEncodedName()));
+                                   info.getEncodedName());
   }
 
   /**
@@ -2464,7 +2469,7 @@ public class HRegion implements HConstan
 
     HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey);
     LOG.info("Creating new region " + newRegionInfo.toString());
-    int encodedName = newRegionInfo.getEncodedName();
+    String encodedName = newRegionInfo.getEncodedName();
     Path newRegionDir = HRegion.getRegionDir(a.getBaseDir(), encodedName);
     if(fs.exists(newRegionDir)) {
       throw new IOException("Cannot merge; target file collision at " +
@@ -2666,12 +2671,12 @@ public class HRegion implements HConstan
 
       // bulid the KeyValue now:
       KeyValue newKv = new KeyValue(row, family,
-          qualifier, System.currentTimeMillis(),
+          qualifier, EnvironmentEdgeManager.currentTimeMillis(),
           Bytes.toBytes(result));
 
       // now log it:
       if (writeToWAL) {
-        long now = System.currentTimeMillis();
+        long now = EnvironmentEdgeManager.currentTimeMillis();
         WALEdit walEdit = new WALEdit();
         walEdit.add(newKv);
         this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
@@ -2882,7 +2887,7 @@ public class HRegion implements HConstan
     Configuration c = HBaseConfiguration.create();
     FileSystem fs = FileSystem.get(c);
     Path logdir = new Path(c.get("hbase.tmp.dir"),
-      "hlog" + tableDir.getName() + System.currentTimeMillis());
+      "hlog" + tableDir.getName() + EnvironmentEdgeManager.currentTimeMillis());
     Path oldLogDir = new Path(c.get("hbase.tmp.dir"), HREGION_OLDLOGDIR_NAME);
     HLog log = new HLog(fs, logdir, oldLogDir, c, null);
     try {

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Sep 27 02:41:16 2011
@@ -488,35 +488,6 @@ public class HRegionServer implements HC
               LOG.info(msgs[i].toString());
               this.connection.unsetRootRegionLocation();
               switch(msgs[i].getType()) {
-              case MSG_CALL_SERVER_STARTUP:
-                // We the MSG_CALL_SERVER_STARTUP on startup but we can also
-                // get it when the master is panicking because for instance
-                // the HDFS has been yanked out from under it.  Be wary of
-                // this message.
-                if (checkFileSystem()) {
-                  closeAllRegions();
-                  try {
-                    hlog.closeAndDelete();
-                  } catch (Exception e) {
-                    LOG.error("error closing and deleting HLog", e);
-                  }
-                  try {
-                    serverInfo.setStartCode(System.currentTimeMillis());
-                    hlog = setupHLog();
-                  } catch (IOException e) {
-                    this.abortRequested = true;
-                    this.stopRequested.set(true);
-                    e = RemoteExceptionHandler.checkIOException(e);
-                    LOG.fatal("error restarting server", e);
-                    break;
-                  }
-                  reportForDuty();
-                  restart = true;
-                } else {
-                  LOG.fatal("file system available check failed. " +
-                  "Shutting down server.");
-                }
-                break;
 
               case MSG_REGIONSERVER_STOP:
                 stopRequested.set(true);
@@ -624,8 +595,7 @@ public class HRegionServer implements HC
         }
         closeAllRegions(); // Don't leave any open file handles
       }
-      LOG.info("aborting server at: " +
-        serverInfo.getServerAddress().toString());
+      LOG.info("aborting server at: " + this.serverInfo.getServerName());
     } else {
       ArrayList<HRegion> closedRegions = closeAllRegions();
       try {
@@ -647,14 +617,13 @@ public class HRegionServer implements HC
         }
 
         LOG.info("telling master that region server is shutting down at: " +
-            serverInfo.getServerAddress().toString());
+            serverInfo.getServerName());
         hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null);
       } catch (Throwable e) {
         LOG.warn("Failed to send exiting message to master: ",
           RemoteExceptionHandler.checkThrowable(e));
       }
-      LOG.info("stopping server at: " +
-        serverInfo.getServerAddress().toString());
+      LOG.info("stopping server at: " + this.serverInfo.getServerName());
     }
 
     // Make sure the proxy is down.
@@ -921,8 +890,7 @@ public class HRegionServer implements HC
     return isOnline;
   }
 
-  private HLog setupHLog() throws RegionServerRunningException,
-    IOException {
+  private HLog setupHLog() throws IOException {
     Path oldLogDir = new Path(rootDir, HREGION_OLDLOGDIR_NAME);
     Path logdir = new Path(rootDir, HLog.getHLogDirectoryName(this.serverInfo));
     if (LOG.isDebugEnabled()) {
@@ -930,7 +898,7 @@ public class HRegionServer implements HC
     }
     if (fs.exists(logdir)) {
       throw new RegionServerRunningException("region server already " +
-        "running at " + this.serverInfo.getServerAddress().toString() +
+        "running at " + this.serverInfo.getServerName() +
         " because logdir " + logdir.toString() + " exists");
     }
     HLog newlog = instantiateHLog(logdir, oldLogDir);
@@ -939,7 +907,8 @@ public class HRegionServer implements HC
 
   // instantiate
   protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
-    HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller);
+    HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller, null,
+        serverInfo.getServerAddress().toString());
     return newlog;
   }
 
@@ -1064,8 +1033,10 @@ public class HRegionServer implements HC
           // auto bind enabled, try to use another port
           LOG.info("Failed binding http info server to port: " + port);
           port++;
-          // update HRS server info
-          this.serverInfo.setInfoPort(port);
+          // update HRS server info port.
+          this.serverInfo = new HServerInfo(this.serverInfo.getServerAddress(),
+            this.serverInfo.getStartCode(),  port,
+            this.serverInfo.getHostname());
         }
       }
     }
@@ -1150,7 +1121,9 @@ public class HRegionServer implements HC
   public void abort() {
     this.abortRequested = true;
     this.reservedSpace.clear();
-    LOG.info("Dump of metrics: " + this.metrics.toString());
+    if (this.metrics != null) {
+      LOG.info("Dump of metrics: " + this.metrics.toString());
+    }
     stop();
   }
 
@@ -1234,7 +1207,7 @@ public class HRegionServer implements HC
         lastMsg = System.currentTimeMillis();
         boolean startCodeOk = false;
         while(!startCodeOk) {
-          serverInfo.setStartCode(System.currentTimeMillis());
+          this.serverInfo = createServerInfoWithNewStartCode(this.serverInfo);
           startCodeOk = zooKeeperWrapper.writeRSLocation(this.serverInfo);
           if(!startCodeOk) {
            LOG.debug("Start code already taken, trying another one");
@@ -1250,6 +1223,11 @@ public class HRegionServer implements HC
     return result;
   }
 
+  private HServerInfo createServerInfoWithNewStartCode(final HServerInfo hsi) {
+    return new HServerInfo(hsi.getServerAddress(), hsi.getInfoPort(),
+      hsi.getHostname());
+  }
+
   /* Add to the outbound message buffer */
   private void reportOpen(HRegionInfo region) {
     this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region));
@@ -1700,6 +1678,24 @@ public class HRegionServer implements HC
     return -1;
   }
 
+  private boolean checkAndMutate(final byte[] regionName, final byte [] row,
+      final byte [] family, final byte [] qualifier, final byte [] value,
+      final Writable w, Integer lock) throws IOException {
+    checkOpen();
+    this.requestCount.incrementAndGet();
+    HRegion region = getRegion(regionName);
+    try {
+      if (!region.getRegionInfo().isMetaTable()) {
+        this.cacheFlusher.reclaimMemStoreMemory();
+      }
+      return region.checkAndMutate(row, family, qualifier, value, w, lock,
+          true);
+    } catch (Throwable t) {
+      throw convertThrowableToIOE(cleanup(t));
+    }
+  }
+
+
   /**
    *
    * @param regionName
@@ -1714,23 +1710,26 @@ public class HRegionServer implements HC
   public boolean checkAndPut(final byte[] regionName, final byte [] row,
       final byte [] family, final byte [] qualifier, final byte [] value,
       final Put put) throws IOException{
-    //Getting actual value
-    Get get = new Get(row);
-    get.addColumn(family, qualifier);
+    return checkAndMutate(regionName, row, family, qualifier, value, put,
+        getLockFromId(put.getLockId()));
+  }
 
-    checkOpen();
-    this.requestCount.incrementAndGet();
-    HRegion region = getRegion(regionName);
-    try {
-      if (!region.getRegionInfo().isMetaTable()) {
-        this.cacheFlusher.reclaimMemStoreMemory();
-      }
-      boolean retval = region.checkAndPut(row, family, qualifier, value, put,
-        getLockFromId(put.getLockId()), true);
-      return retval;
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    }
+  /**
+   *
+   * @param regionName
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param value the expected value
+   * @param delete
+   * @throws IOException
+   * @return true if the new put was execute, false otherwise
+   */
+  public boolean checkAndDelete(final byte[] regionName, final byte [] row,
+      final byte [] family, final byte [] qualifier, final byte [] value,
+      final Delete delete) throws IOException{
+    return checkAndMutate(regionName, row, family, qualifier, value, delete,
+        getLockFromId(delete.getLockId()));
   }
 
   //
@@ -2003,6 +2002,14 @@ public class HRegionServer implements HC
     }
   }
 
+  @Override
+  public void bulkLoadHFile(
+      String hfilePath, byte[] regionName, byte[] familyName)
+  throws IOException {
+    HRegion region = getRegion(regionName);
+    region.bulkLoadHFile(hfilePath, familyName);
+  }
+
   Map<String, Integer> rowlocks =
     new ConcurrentHashMap<String, Integer>();
 
@@ -2452,4 +2459,5 @@ public class HRegionServer implements HC
         HRegionServer.class);
     doMain(args, regionServerClass);
   }
+
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Tue Sep 27 02:41:16 2011
@@ -72,7 +72,7 @@ public class KeyValueHeap implements Key
     return this.current.peek();
   }
 
-  public KeyValue next()  {
+  public KeyValue next()  throws IOException {
     if(this.current == null) {
       return null;
     }
@@ -178,8 +178,9 @@ public class KeyValueHeap implements Key
    * automatically closed and removed from the heap.
    * @param seekKey KeyValue to seek at or after
    * @return true if KeyValues exist at or after specified key, false if not
+   * @throws IOException
    */
-  public boolean seek(KeyValue seekKey) {
+  public boolean seek(KeyValue seekKey) throws IOException {
     if(this.current == null) {
       return false;
     }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java Tue Sep 27 02:41:16 2011
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.KeyValue;
 
 /**
@@ -35,17 +37,17 @@ public interface KeyValueScanner {
    * Return the next KeyValue in this scanner, iterating the scanner
    * @return the next KeyValue
    */
-  public KeyValue next();
+  public KeyValue next() throws IOException;
 
   /**
    * Seek the scanner at or after the specified KeyValue.
    * @param key seek value
    * @return true if scanner has values left, false if end of scanner
    */
-  public boolean seek(KeyValue key);
+  public boolean seek(KeyValue key) throws IOException;
 
   /**
    * Close the KeyValue scanner.
    */
   public void close();
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java Tue Sep 27 02:41:16 2011
@@ -36,7 +36,8 @@ public class MinorCompactingStoreScanner
   private KeyValueHeap heap;
   private KeyValue.KVComparator comparator;
 
-  MinorCompactingStoreScanner(Store store, List<? extends KeyValueScanner> scanners) {
+  MinorCompactingStoreScanner(Store store, List<? extends KeyValueScanner> scanners)
+      throws IOException {
     comparator = store.comparator;
     KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW);
     for (KeyValueScanner scanner : scanners ) {
@@ -46,7 +47,8 @@ public class MinorCompactingStoreScanner
   }
 
   MinorCompactingStoreScanner(String cfName, KeyValue.KVComparator comparator,
-                              List<? extends KeyValueScanner> scanners) {
+                              List<? extends KeyValueScanner> scanners)
+      throws IOException {
     this.comparator = comparator;
 
     KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW);
@@ -61,7 +63,7 @@ public class MinorCompactingStoreScanner
     return heap.peek();
   }
 
-  public KeyValue next() {
+  public KeyValue next() throws IOException {
     return heap.next();
   }
 

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=1176171&r1=1176170&r2=1176171&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java Tue Sep 27 02:41:16 2011
@@ -22,7 +22,11 @@ public class ReadWriteConsistencyControl
   public static long getThreadReadPoint() {
     return perThreadReadPoint.get();
   }
-  
+
+  public static void setThreadReadPoint(long readPoint) {
+    perThreadReadPoint.set(readPoint);
+  }
+
   public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
     perThreadReadPoint.set(rwcc.memstoreReadPoint());
     return getThreadReadPoint();



Mime
View raw message