Return-Path: Delivered-To: apmail-hadoop-hbase-commits-archive@minotaur.apache.org Received: (qmail 13137 invoked from network); 11 Feb 2010 05:41:16 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Feb 2010 05:41:16 -0000 Received: (qmail 23096 invoked by uid 500); 11 Feb 2010 05:41:16 -0000 Delivered-To: apmail-hadoop-hbase-commits-archive@hadoop.apache.org Received: (qmail 23026 invoked by uid 500); 11 Feb 2010 05:41:15 -0000 Mailing-List: contact hbase-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hbase-dev@hadoop.apache.org Delivered-To: mailing list hbase-commits@hadoop.apache.org Received: (qmail 22999 invoked by uid 99); 11 Feb 2010 05:41:14 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Feb 2010 05:41:14 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Feb 2010 05:41:06 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4C24823888CC; Thu, 11 Feb 2010 05:40:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r908846 - in /hadoop/hbase/branches/0.20: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/ Date: Thu, 11 Feb 2010 05:40:46 -0000 To: hbase-commits@hadoop.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100211054046.4C24823888CC@eris.apache.org> Author: stack Date: Thu Feb 11 05:40:45 2010 New Revision: 908846 URL: http://svn.apache.org/viewvc?rev=908846&view=rev Log: HBASE-2190 HRS should report to master when HMsg are available Modified: hadoop/hbase/branches/0.20/CHANGES.txt hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HMsg.java hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestHMsg.java Modified: hadoop/hbase/branches/0.20/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=908846&r1=908845&r2=908846&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/CHANGES.txt (original) +++ hadoop/hbase/branches/0.20/CHANGES.txt Thu Feb 11 05:40:45 2010 @@ -10,6 +10,7 @@ HBASE-2180 Bad read performance from synchronizing hfile.fddatainputstream HBASE-2185 Add html version of default hbase-site.xml (Kay Kay via Stack) HBASE-2189 HCM trashes meta cache even when not needed + HBASE-2190 HRS should report to master when HMsg are available NEW FEATURES Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HMsg.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HMsg.java?rev=908846&r1=908845&r2=908846&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HMsg.java (original) +++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HMsg.java Thu Feb 11 05:40:45 2010 @@ -83,6 +83,7 @@ * * Note that this message is immediately followed by two MSG_REPORT_OPEN * messages, one for each of the new regions resulting from the split + * @deprecated See MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS */ MSG_REPORT_SPLIT, @@ -108,11 +109,21 @@ * Run Major Compaction */ MSG_REGION_MAJOR_COMPACT, + + /** + * Region server split the region associated with this message. + * + * Its like MSG_REPORT_SPLIT only it carries the daughters in the message + * rather than send them individually in MSG_REPORT_OPEN messages. + */ + MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, } private Type type = null; private HRegionInfo info = null; private byte[] message = null; + private HRegionInfo daughterA = null; + private HRegionInfo daughterB = null; /** Default constructor. Used during deserialization */ public HMsg() { @@ -145,6 +156,21 @@ * @param msg Optional message (Stringified exception, etc.) */ public HMsg(final HMsg.Type type, final HRegionInfo hri, final byte[] msg) { + this(type, hri, null, null, msg); + } + + /** + * Construct a message with the specified message and HRegionInfo + * + * @param type Message type + * @param hri Region to which message type applies. Cannot be + * null. If no info associated, used other Constructor. + * @param daughterA + * @param daughterB + * @param msg Optional message (Stringified exception, etc.) + */ + public HMsg(final HMsg.Type type, final HRegionInfo hri, + final HRegionInfo daughterA, final HRegionInfo daughterB, final byte[] msg) { if (type == null) { throw new NullPointerException("Message type cannot be null"); } @@ -154,6 +180,8 @@ } this.info = hri; this.message = msg; + this.daughterA = daughterA; + this.daughterB = daughterB; } /** @@ -182,6 +210,22 @@ } /** + * @return First daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else + * null + */ + public HRegionInfo getDaughterA() { + return this.daughterA; + } + + /** + * @return Second daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else + * null + */ + public HRegionInfo getDaughterB() { + return this.daughterB; + } + + /** * @see java.lang.Object#toString() */ @Override @@ -247,6 +291,10 @@ out.writeBoolean(true); Bytes.writeByteArray(out, this.message); } + if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) { + this.daughterA.write(out); + this.daughterB.write(out); + } } /** @@ -260,5 +308,11 @@ if (hasMessage) { this.message = Bytes.readByteArray(in); } + if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) { + this.daughterA = new HRegionInfo(); + this.daughterB = new HRegionInfo(); + this.daughterA.readFields(in); + this.daughterB.readFields(in); + } } -} +} \ No newline at end of file Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=908846&r1=908845&r2=908846&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java (original) +++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/master/ServerManager.java Thu Feb 11 05:40:45 2010 @@ -455,7 +455,13 @@ break; case MSG_REPORT_SPLIT: - processSplitRegion(region, incomingMsgs[++i], incomingMsgs[++i]); + processSplitRegion(region, incomingMsgs[++i].getRegionInfo(), + incomingMsgs[++i].getRegionInfo()); + break; + + case MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS: + processSplitRegion(region, incomingMsgs[i].getDaughterA(), + incomingMsgs[i].getDaughterB()); break; default: @@ -497,14 +503,14 @@ * @param splitB * @param returnMsgs */ - private void processSplitRegion(HRegionInfo region, HMsg splitA, HMsg splitB) { + private void processSplitRegion(HRegionInfo region, HRegionInfo a, HRegionInfo b) { synchronized (master.regionManager) { // Cancel any actions pending for the affected region. // This prevents the master from sending a SPLIT message if the table // has already split by the region server. master.regionManager.endActions(region.getRegionName()); - assignSplitDaughter(splitA.getRegionInfo()); - assignSplitDaughter(splitB.getRegionInfo()); + assignSplitDaughter(a); + assignSplitDaughter(b); if (region.isMetaTable()) { // A meta region has split. master.regionManager.offlineMetaRegion(region.getStartKey()); Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=908846&r1=908845&r2=908846&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Feb 11 05:40:45 2010 @@ -116,6 +116,7 @@ static final Log LOG = LogFactory.getLog(HRegionServer.class); private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING); private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED); + private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {}; // Set when a report to the master comes back with a message asking us to // shutdown. Also set by call to stop when debugging or running unit tests @@ -151,8 +152,8 @@ new ConcurrentHashMap(); protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final List outboundMsgs = - Collections.synchronizedList(new ArrayList()); + private final LinkedBlockingQueue outboundMsgs = + new LinkedBlockingQueue(); final int numRetries; protected final int threadWakeFrequency; @@ -436,7 +437,7 @@ LOG.warn("No response from master on reportForDuty. Sleeping and " + "then trying again."); } - HMsg outboundArray[] = null; + List outboundMessages = new ArrayList(); long lastMsg = 0; // Now ask master what it wants us to do and tell it what we have done for (int tries = 0; !stopRequested.get() && isHealthy();) { @@ -457,10 +458,10 @@ LOG.warn("unable to report to master for " + (now - lastMsg) + " milliseconds - retrying"); } - // Send messages to the master IF this.msgInterval has elapsed OR if - // we have something to tell (and we didn't just fail sending master). - if ((now - lastMsg) >= msgInterval || - ((outboundArray == null || outboundArray.length == 0) && !this.outboundMsgs.isEmpty())) { + // Drop into the send loop if msgInterval has elapsed or if something + // to send. If we fail talking to the master, then we'll sleep below + // on poll of the outboundMsgs blockingqueue. + if ((now - lastMsg) >= msgInterval || !outboundMessages.isEmpty()) { try { doMetrics(); MemoryUsage memory = @@ -473,11 +474,13 @@ } this.serverInfo.setLoad(hsl); this.requestCount.set(0); - outboundArray = getOutboundMsgs(outboundArray); - HMsg msgs[] = hbaseMaster.regionServerReport( - serverInfo, outboundArray, getMostLoadedRegions()); + addOutboundMsgs(outboundMessages); + HMsg msgs[] = this.hbaseMaster.regionServerReport( + serverInfo, outboundMessages.toArray(EMPTY_HMSG_ARRAY), + getMostLoadedRegions()); lastMsg = System.currentTimeMillis(); - outboundArray = updateOutboundMsgs(outboundArray); + updateOutboundMsgs(outboundMessages); + outboundMessages.clear(); if (this.quiesced.get() && onlineRegions.size() == 0) { // We've just told the master we're exiting because we aren't // serving any regions. So set the stop bit and exit. @@ -589,9 +592,13 @@ lastMsg = System.currentTimeMillis(); } } - // Do some housekeeping before going to sleep + now = System.currentTimeMillis(); + HMsg msg = this.outboundMsgs.poll((msgInterval - (now - lastMsg)), + TimeUnit.MILLISECONDS); + // If we got something, add it to list of things to send. + if (msg != null) outboundMessages.add(msg); + // Do some housekeeping before going back around housekeeping(); - sleeper.sleep(lastMsg); } // for } catch (Throwable t) { if (!checkOOME(t)) { @@ -685,31 +692,39 @@ } /* - * @param msgs Current outboundMsgs array - * @return Messages to send or returns current outboundMsgs if it already had - * content to send. - */ - private HMsg [] getOutboundMsgs(final HMsg [] msgs) { - // If passed msgs are not null, means we haven't passed them to master yet. - if (msgs != null) return msgs; - synchronized(this.outboundMsgs) { - return this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]); + * Add to the passed msgs messages to pass to the master. + * @param msgs Current outboundMsgs array; we'll add messages to this List. + */ + private void addOutboundMsgs(final List msgs) { + if (msgs.isEmpty()) { + this.outboundMsgs.drainTo(msgs); + return; + } + OUTER: for (HMsg m: this.outboundMsgs) { + for (HMsg mm: msgs) { + // Be careful don't add duplicates. + if (mm.equals(m)) { + continue OUTER; + } + } + msgs.add(m); } } /* + * Remove from this.outboundMsgs those messsages we sent the master. * @param msgs Messages we sent the master. - * @return Null */ - private HMsg [] updateOutboundMsgs(final HMsg [] msgs) { - if (msgs == null) return null; - synchronized(this.outboundMsgs) { - for (HMsg m: msgs) { - int index = this.outboundMsgs.indexOf(m); - if (index != -1) this.outboundMsgs.remove(index); + private void updateOutboundMsgs(final List msgs) { + if (msgs.isEmpty()) return; + for (HMsg m: this.outboundMsgs) { + for (HMsg mm: msgs) { + if (mm.equals(m)) { + this.outboundMsgs.remove(m); + break; + } } } - return null; } /** @@ -1203,8 +1218,7 @@ } /* - * Run some housekeeping tasks before we go into 'hibernation' sleeping at - * the end of the main HRegionServer run loop. + * Run some housekeeping tasks. */ private void housekeeping() { // If the todo list has > 0 messages, iterate looking for open region @@ -1356,7 +1370,7 @@ /* Add to the outbound message buffer */ private void reportOpen(HRegionInfo region) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); } /* Add to the outbound message buffer */ @@ -1366,7 +1380,7 @@ /* Add to the outbound message buffer */ private void reportClose(final HRegionInfo region, final byte[] message) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); } /** @@ -1381,12 +1395,11 @@ */ void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, HRegionInfo newRegionB) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion, - ("Daughters; " + - newRegionA.getRegionNameAsString() + ", " + - newRegionB.getRegionNameAsString()).getBytes())); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA)); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB)); + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, + oldRegion, newRegionA, newRegionB, + Bytes.toBytes("Daughters; " + + newRegionA.getRegionNameAsString() + ", " + + newRegionB.getRegionNameAsString()))); } ////////////////////////////////////////////////////////////////////////////// @@ -2344,7 +2357,7 @@ /** * @return Queue to which you can add outbound messages. */ - protected List getOutboundMsgs() { + protected LinkedBlockingQueue getOutboundMsgs() { return this.outboundMsgs; } Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestHMsg.java URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestHMsg.java?rev=908846&r1=908845&r2=908846&view=diff ============================================================================== --- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestHMsg.java (original) +++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/TestHMsg.java Thu Feb 11 05:40:45 2010 @@ -19,13 +19,15 @@ */ package org.apache.hadoop.hbase; +import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hbase.util.Bytes; - import junit.framework.TestCase; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; + public class TestHMsg extends TestCase { public void testList() { List msgs = new ArrayList(); @@ -52,4 +54,29 @@ new HRegionInfo(new HTableDescriptor(Bytes.toBytes("test")), b, b)); assertNotSame(-1, msgs.indexOf(hmsg)); } + + public void testSerialization() throws IOException { + // Check out new HMsg that carries two daughter split regions. + byte [] abytes = Bytes.toBytes("a"); + byte [] bbytes = Bytes.toBytes("b"); + byte [] parentbytes = Bytes.toBytes("parent"); + HRegionInfo parent = + new HRegionInfo(new HTableDescriptor(Bytes.toBytes("parent")), + parentbytes, parentbytes); + // Assert simple HMsg serializes + HMsg hmsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, parent); + byte [] bytes = Writables.getBytes(hmsg); + HMsg close = (HMsg)Writables.getWritable(bytes, new HMsg()); + assertTrue(close.equals(hmsg)); + // Assert split serializes + HRegionInfo daughtera = + new HRegionInfo(new HTableDescriptor(Bytes.toBytes("a")), abytes, abytes); + HRegionInfo daughterb = + new HRegionInfo(new HTableDescriptor(Bytes.toBytes("b")), bbytes, bbytes); + HMsg splithmsg = new HMsg(HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, + parent, daughtera, daughterb, Bytes.toBytes("split")); + bytes = Writables.getBytes(splithmsg); + hmsg = (HMsg)Writables.getWritable(bytes, new HMsg()); + assertTrue(splithmsg.equals(hmsg)); + } }