hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r908846 - in /hadoop/hbase/branches/0.20: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/
Date Thu, 11 Feb 2010 05:40:46 GMT
Author: stack
Date: Thu Feb 11 05:40:45 2010
New Revision: 908846

URL: http://svn.apache.org/viewvc?rev=908846&view=rev
Log:
HBASE-2190 HRS should report to master when HMsg are available

Modified:
    hadoop/hbase/branches/0.20/CHANGES.txt
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HMsg.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java
    hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestHMsg.java

Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=908846&r1=908845&r2=908846&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Thu Feb 11 05:40:45 2010
@@ -10,6 +10,7 @@
    HBASE-2180  Bad read performance from synchronizing hfile.fddatainputstream
    HBASE-2185  Add html version of default hbase-site.xml (Kay Kay via Stack)
    HBASE-2189  HCM trashes meta cache even when not needed
+   HBASE-2190  HRS should report to master when HMsg are available
 
   NEW FEATURES
 

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HMsg.java?rev=908846&r1=908845&r2=908846&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HMsg.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HMsg.java Thu Feb 11 05:40:45
2010
@@ -83,6 +83,7 @@
      * 
      * Note that this message is immediately followed by two MSG_REPORT_OPEN
      * messages, one for each of the new regions resulting from the split
+     * @deprecated See MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS
      */
     MSG_REPORT_SPLIT,
 
@@ -108,11 +109,21 @@
      * Run Major Compaction
      */
     MSG_REGION_MAJOR_COMPACT,
+
+    /**
+     * Region server split the region associated with this message.
+     * 
+     * Its like MSG_REPORT_SPLIT only it carries the daughters in the message
+     * rather than send them individually in MSG_REPORT_OPEN messages.
+     */
+    MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS,
   }
 
   private Type type = null;
   private HRegionInfo info = null;
   private byte[] message = null;
+  private HRegionInfo daughterA = null;
+  private HRegionInfo daughterB = null;
 
   /** Default constructor. Used during deserialization */
   public HMsg() {
@@ -145,6 +156,21 @@
    * @param msg Optional message (Stringified exception, etc.)
    */
   public HMsg(final HMsg.Type type, final HRegionInfo hri, final byte[] msg) {
+    this(type, hri, null, null, msg);
+  }
+
+  /**
+   * Construct a message with the specified message and HRegionInfo
+   * 
+   * @param type Message type
+   * @param hri Region to which message <code>type</code> applies.  Cannot be
+   * null.  If no info associated, used other Constructor.
+   * @param daughterA
+   * @param daughterB
+   * @param msg Optional message (Stringified exception, etc.)
+   */
+  public HMsg(final HMsg.Type type, final HRegionInfo hri,
+      final HRegionInfo daughterA, final HRegionInfo daughterB, final byte[] msg) {
     if (type == null) {
       throw new NullPointerException("Message type cannot be null");
     }
@@ -154,6 +180,8 @@
     }
     this.info = hri;
     this.message = msg;
+    this.daughterA = daughterA;
+    this.daughterB = daughterB;
   }
 
   /**
@@ -182,6 +210,22 @@
   }
 
   /**
+   * @return First daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else
+   * null
+   */
+  public HRegionInfo getDaughterA() {
+    return this.daughterA;
+  }
+
+  /**
+   * @return Second daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else
+   * null
+   */
+  public HRegionInfo getDaughterB() {
+    return this.daughterB;
+  }
+
+  /**
    * @see java.lang.Object#toString()
    */
   @Override
@@ -247,6 +291,10 @@
        out.writeBoolean(true);
        Bytes.writeByteArray(out, this.message);
      }
+     if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) {
+       this.daughterA.write(out);
+       this.daughterB.write(out);
+     }
    }
 
   /**
@@ -260,5 +308,11 @@
      if (hasMessage) {
        this.message = Bytes.readByteArray(in);
      }
+     if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) {
+       this.daughterA = new HRegionInfo();
+       this.daughterB = new HRegionInfo();
+       this.daughterA.readFields(in);
+       this.daughterB.readFields(in);
+     }
    }
-}
+}
\ No newline at end of file

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=908846&r1=908845&r2=908846&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java
(original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java
Thu Feb 11 05:40:45 2010
@@ -455,7 +455,13 @@
           break;
 
         case MSG_REPORT_SPLIT:
-          processSplitRegion(region, incomingMsgs[++i], incomingMsgs[++i]);
+          processSplitRegion(region, incomingMsgs[++i].getRegionInfo(),
+            incomingMsgs[++i].getRegionInfo());
+          break;
+        
+        case MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS:
+          processSplitRegion(region, incomingMsgs[i].getDaughterA(),
+            incomingMsgs[i].getDaughterB());
           break;
 
         default:
@@ -497,14 +503,14 @@
    * @param splitB
    * @param returnMsgs
    */
-  private void processSplitRegion(HRegionInfo region, HMsg splitA, HMsg splitB) {
+  private void processSplitRegion(HRegionInfo region, HRegionInfo a, HRegionInfo b) {
     synchronized (master.regionManager) {
       // Cancel any actions pending for the affected region.
       // This prevents the master from sending a SPLIT message if the table
       // has already split by the region server. 
       master.regionManager.endActions(region.getRegionName());
-      assignSplitDaughter(splitA.getRegionInfo());
-      assignSplitDaughter(splitB.getRegionInfo());
+      assignSplitDaughter(a);
+      assignSplitDaughter(b);
       if (region.isMetaTable()) {
         // A meta region has split.
         master.regionManager.offlineMetaRegion(region.getStartKey());

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=908846&r1=908845&r2=908846&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Thu Feb 11 05:40:45 2010
@@ -116,6 +116,7 @@
   static final Log LOG = LogFactory.getLog(HRegionServer.class);
   private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
   private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
+  private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
   
   // Set when a report to the master comes back with a message asking us to
   // shutdown.  Also set by call to stop when debugging or running unit tests
@@ -151,8 +152,8 @@
     new ConcurrentHashMap<Integer, HRegion>();
 
   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final List<HMsg> outboundMsgs =
-    Collections.synchronizedList(new ArrayList<HMsg>());
+  private final LinkedBlockingQueue<HMsg> outboundMsgs =
+    new LinkedBlockingQueue<HMsg>();
 
   final int numRetries;
   protected final int threadWakeFrequency;
@@ -436,7 +437,7 @@
         LOG.warn("No response from master on reportForDuty. Sleeping and " +
           "then trying again.");
       }
-      HMsg outboundArray[] = null;
+      List<HMsg> outboundMessages = new ArrayList<HMsg>();
       long lastMsg = 0;
       // Now ask master what it wants us to do and tell it what we have done
       for (int tries = 0; !stopRequested.get() && isHealthy();) {
@@ -457,10 +458,10 @@
           LOG.warn("unable to report to master for " + (now - lastMsg) +
             " milliseconds - retrying");
         }
-        // Send messages to the master IF this.msgInterval has elapsed OR if
-        // we have something to tell (and we didn't just fail sending master).
-        if ((now - lastMsg) >= msgInterval ||
-            ((outboundArray == null || outboundArray.length == 0) && !this.outboundMsgs.isEmpty()))
{
+        // Drop into the send loop if msgInterval has elapsed or if something
+        // to send.  If we fail talking to the master, then we'll sleep below
+        // on poll of the outboundMsgs blockingqueue.
+        if ((now - lastMsg) >= msgInterval || !outboundMessages.isEmpty()) {
           try {
             doMetrics();
             MemoryUsage memory =
@@ -473,11 +474,13 @@
             }
             this.serverInfo.setLoad(hsl);
             this.requestCount.set(0);
-            outboundArray = getOutboundMsgs(outboundArray);
-            HMsg msgs[] = hbaseMaster.regionServerReport(
-              serverInfo, outboundArray, getMostLoadedRegions());
+            addOutboundMsgs(outboundMessages);
+            HMsg msgs[] = this.hbaseMaster.regionServerReport(
+              serverInfo, outboundMessages.toArray(EMPTY_HMSG_ARRAY),
+              getMostLoadedRegions());
             lastMsg = System.currentTimeMillis();
-            outboundArray = updateOutboundMsgs(outboundArray);
+            updateOutboundMsgs(outboundMessages);
+            outboundMessages.clear();
             if (this.quiesced.get() && onlineRegions.size() == 0) {
               // We've just told the master we're exiting because we aren't
               // serving any regions. So set the stop bit and exit.
@@ -589,9 +592,13 @@
             lastMsg = System.currentTimeMillis();
           }
         }
-        // Do some housekeeping before going to sleep
+        now = System.currentTimeMillis();
+        HMsg msg = this.outboundMsgs.poll((msgInterval - (now - lastMsg)), 
+          TimeUnit.MILLISECONDS);
+        // If we got something, add it to list of things to send.
+        if (msg != null) outboundMessages.add(msg);
+        // Do some housekeeping before going back around
         housekeeping();
-        sleeper.sleep(lastMsg);
       } // for
     } catch (Throwable t) {
       if (!checkOOME(t)) {
@@ -685,31 +692,39 @@
   }
 
   /*
-   * @param msgs Current outboundMsgs array
-   * @return Messages to send or returns current outboundMsgs if it already had
-   * content to send.
-   */
-  private HMsg [] getOutboundMsgs(final HMsg [] msgs) {
-    // If passed msgs are not null, means we haven't passed them to master yet.
-    if (msgs != null) return msgs;
-    synchronized(this.outboundMsgs) {
-      return this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
+   * Add to the passed <code>msgs</code> messages to pass to the master.
+   * @param msgs Current outboundMsgs array; we'll add messages to this List.
+   */
+  private void addOutboundMsgs(final List<HMsg> msgs) {
+    if (msgs.isEmpty()) {
+      this.outboundMsgs.drainTo(msgs);
+      return;
+    }
+    OUTER: for (HMsg m: this.outboundMsgs) {
+      for (HMsg mm: msgs) {
+        // Be careful don't add duplicates.
+        if (mm.equals(m)) {
+          continue OUTER;
+        }
+      }
+      msgs.add(m);
     }
   }
 
   /*
+   * Remove from this.outboundMsgs those messsages we sent the master.
    * @param msgs Messages we sent the master.
-   * @return Null
    */
-  private HMsg [] updateOutboundMsgs(final HMsg [] msgs) {
-    if (msgs == null) return null;
-    synchronized(this.outboundMsgs) {
-      for (HMsg m: msgs) {
-        int index = this.outboundMsgs.indexOf(m);
-        if (index != -1) this.outboundMsgs.remove(index);
+  private void updateOutboundMsgs(final List<HMsg> msgs) {
+    if (msgs.isEmpty()) return;
+    for (HMsg m: this.outboundMsgs) {
+      for (HMsg mm: msgs) {
+        if (mm.equals(m)) {
+          this.outboundMsgs.remove(m);
+          break;
+        }
       }
     }
-    return null;
   }
 
   /**
@@ -1203,8 +1218,7 @@
   }
 
   /*
-   * Run some housekeeping tasks before we go into 'hibernation' sleeping at
-   * the end of the main HRegionServer run loop.
+   * Run some housekeeping tasks.
    */
   private void housekeeping() {
     // If the todo list has > 0 messages, iterate looking for open region
@@ -1356,7 +1370,7 @@
 
   /* Add to the outbound message buffer */
   private void reportOpen(HRegionInfo region) {
-    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region));
+    this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region));
   }
 
   /* Add to the outbound message buffer */
@@ -1366,7 +1380,7 @@
 
   /* Add to the outbound message buffer */
   private void reportClose(final HRegionInfo region, final byte[] message) {
-    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message));
+    this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message));
   }
   
   /**
@@ -1381,12 +1395,11 @@
    */
   void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA,
       HRegionInfo newRegionB) {
-    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion,
-      ("Daughters; " +
-        newRegionA.getRegionNameAsString() + ", " +
-        newRegionB.getRegionNameAsString()).getBytes()));
-    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA));
-    outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB));
+    this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS,
+      oldRegion, newRegionA, newRegionB,
+      Bytes.toBytes("Daughters; " +
+          newRegionA.getRegionNameAsString() + ", " +
+          newRegionB.getRegionNameAsString())));
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -2344,7 +2357,7 @@
   /**
    * @return Queue to which you can add outbound messages.
    */
-  protected List<HMsg> getOutboundMsgs() {
+  protected LinkedBlockingQueue<HMsg> getOutboundMsgs() {
     return this.outboundMsgs;
   }
 

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestHMsg.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestHMsg.java?rev=908846&r1=908845&r2=908846&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestHMsg.java (original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestHMsg.java Thu Feb 11 05:40:45
2010
@@ -19,13 +19,15 @@
  */
 package org.apache.hadoop.hbase;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.hbase.util.Bytes;
-
 import junit.framework.TestCase;
 
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+
 public class TestHMsg extends TestCase {
   public void testList() {
     List<HMsg> msgs = new ArrayList<HMsg>();
@@ -52,4 +54,29 @@
      new HRegionInfo(new HTableDescriptor(Bytes.toBytes("test")), b, b));
     assertNotSame(-1, msgs.indexOf(hmsg));
   }
+  
+  public void testSerialization() throws IOException {
+    // Check out new HMsg that carries two daughter split regions.
+    byte [] abytes = Bytes.toBytes("a");
+    byte [] bbytes = Bytes.toBytes("b");
+    byte [] parentbytes = Bytes.toBytes("parent");
+    HRegionInfo parent =
+      new HRegionInfo(new HTableDescriptor(Bytes.toBytes("parent")),
+      parentbytes, parentbytes);
+    // Assert simple HMsg serializes
+    HMsg hmsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, parent);
+    byte [] bytes = Writables.getBytes(hmsg);
+    HMsg close = (HMsg)Writables.getWritable(bytes, new HMsg());
+    assertTrue(close.equals(hmsg));
+    // Assert split serializes
+    HRegionInfo daughtera =
+      new HRegionInfo(new HTableDescriptor(Bytes.toBytes("a")), abytes, abytes);
+    HRegionInfo daughterb =
+      new HRegionInfo(new HTableDescriptor(Bytes.toBytes("b")), bbytes, bbytes);
+    HMsg splithmsg = new HMsg(HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS,
+      parent, daughtera, daughterb, Bytes.toBytes("split"));
+    bytes = Writables.getBytes(splithmsg);
+    hmsg = (HMsg)Writables.getWritable(bytes, new HMsg());
+    assertTrue(splithmsg.equals(hmsg));
+  }
 }



Mime
View raw message