hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r1510800 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/master/ test/java/org/apache/hadoop/hbase/master/
Date Mon, 05 Aug 2013 23:10:37 GMT
Author: ddas
Date: Mon Aug  5 23:10:37 2013
New Revision: 1510800

URL: http://svn.apache.org/r1510800
Log:
HBASE-9095. AssignmentManager's handleRegion should respect the single threaded nature of
the processing

Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1510800&r1=1510799&r2=1510800&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
Mon Aug  5 23:10:37 2013
@@ -159,6 +159,17 @@ public class AssignmentManager extends Z
 
   private final ExecutorService executorService;
 
+  // For unit tests, keep track of calls to ClosedRegionHandler
+  private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = 
+      new HashMap<HRegionInfo, AtomicBoolean>();
+
+  // For unit tests, keep track of calls to OpenedRegionHandler
+  private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = 
+      new HashMap<HRegionInfo, AtomicBoolean>();
+
+  // For unit tests, keep track of calls to SplitRegionHandler
+  private AtomicBoolean splitRegionHandlerCalled = new AtomicBoolean(false);
+
   //Thread pool executor service for timeout monitor
   private java.util.concurrent.ExecutorService threadPoolExecutorService;
 
@@ -836,8 +847,8 @@ public class AssignmentManager extends Z
             break;
           }
           // Run handler to do the rest of the SPLIT handling.
-          this.executorService.submit(new SplitRegionHandler(server, this,
-            regionState.getRegion(), sn, daughters));
+          new SplitRegionHandler(server, this, regionState.getRegion(), sn, daughters).process();
+          splitRegionHandlerCalled.set(true);
           break;
 
         case RS_ZK_REGION_MERGING:
@@ -872,8 +883,7 @@ public class AssignmentManager extends Z
               + merge_a + ", rs_b=" + merge_b);
           }
           // Run handler to do the rest of the MERGED handling.
-          this.executorService.submit(new MergedRegionHandler(
-            server, this, sn, mergeRegions));
+          new MergedRegionHandler(server, this, sn, mergeRegions).process();
           break;
 
         case M_ZK_REGION_CLOSING:
@@ -907,8 +917,8 @@ public class AssignmentManager extends Z
           regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
           if (regionState != null) {
             removeClosedRegion(regionState.getRegion());
-            this.executorService.submit(new ClosedRegionHandler(server,
-              this, regionState.getRegion()));
+            new ClosedRegionHandler(server, this, regionState.getRegion()).process();
+            closedRegionHandlerCalled.put(regionState.getRegion(), new AtomicBoolean(true));
           }
           break;
 
@@ -941,8 +951,7 @@ public class AssignmentManager extends Z
               // When there are more than one region server a new RS is selected as the
               // destination and the same is updated in the regionplan. (HBASE-5546)
               getRegionPlan(regionState.getRegion(), sn, true);
-              this.executorService.submit(new ClosedRegionHandler(server,
-                this, regionState.getRegion()));
+              new ClosedRegionHandler(server, this, regionState.getRegion()).process();
             }
           }
           break;
@@ -980,8 +989,9 @@ public class AssignmentManager extends Z
           regionState = regionStates.updateRegionState(rt, RegionState.State.OPEN);
           if (regionState != null) {
             failedOpenTracker.remove(encodedName); // reset the count, if any
-            this.executorService.submit(new OpenedRegionHandler(
-              server, this, regionState.getRegion(), sn, expectedVersion));
+            new OpenedRegionHandler(
+              server, this, regionState.getRegion(), sn, expectedVersion).process();
+            openedRegionHandlerCalled.put(regionState.getRegion(), new AtomicBoolean(true));
           }
           break;
 
@@ -993,6 +1003,32 @@ public class AssignmentManager extends Z
     }
   }
 
+  //For unit tests only
+  boolean wasClosedHandlerCalled(HRegionInfo hri) {
+    AtomicBoolean b = closedRegionHandlerCalled.get(hri);
+    //compareAndSet to be sure that unit tests don't see stale values. Means,
+    //we will return true exactly once unless the handler code resets to true
+    //this value.
+    return b == null ? false : b.compareAndSet(true, false);
+  }
+
+  //For unit tests only
+  boolean wasOpenedHandlerCalled(HRegionInfo hri) {
+    AtomicBoolean b = openedRegionHandlerCalled.get(hri);
+    //compareAndSet to be sure that unit tests don't see stale values. Means,
+    //we will return true exactly once unless the handler code resets to true
+    //this value.
+    return b == null ? false : b.compareAndSet(true, false);
+  }
+
+  //For unit tests only
+  boolean wasSplitHandlerCalled() {
+    //compareAndSet to be sure that unit tests don't see stale values. Means,
+    //we will return true exactly once unless the handler code resets to true
+    //this value.
+    return splitRegionHandlerCalled.compareAndSet(true, false);
+  }
+
   /**
    * @return Returns true if this RegionState is splittable; i.e. the
    * RegionState is currently in splitting state or pending_close or

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java?rev=1510800&r1=1510799&r2=1510800&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
Mon Aug  5 23:10:37 2013
@@ -24,16 +24,11 @@ import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.catalog.MetaReader;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.executor.EventHandler;
-import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
-import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -86,35 +81,27 @@ public class TestMaster {
         tableRegions.get(0).getFirst().getEndKey());
 
     // Now trigger a split and stop when the split is in progress
-    CountDownLatch split = new CountDownLatch(1);
-    CountDownLatch proceed = new CountDownLatch(1);
-    RegionSplitListener list = new RegionSplitListener(split, proceed);
-    cluster.getMaster().executorService.
-      registerListener(EventType.RS_ZK_REGION_SPLIT, list);
-
     LOG.info("Splitting table");
     TEST_UTIL.getHBaseAdmin().split(TABLENAME);
     LOG.info("Waiting for split result to be about to open");
-    split.await(60, TimeUnit.SECONDS);
-    try {
-      LOG.info("Making sure we can call getTableRegions while opening");
-      tableRegions = MetaReader.getTableRegionsAndLocations(m.getCatalogTracker(),
+    while (!m.assignmentManager.wasSplitHandlerCalled()) {
+      Thread.sleep(100);
+    }
+    LOG.info("Making sure we can call getTableRegions while opening");
+    tableRegions = MetaReader.getTableRegionsAndLocations(m.getCatalogTracker(),
         TABLENAME, false);
 
-      LOG.info("Regions: " + Joiner.on(',').join(tableRegions));
-      // We have three regions because one is split-in-progress
-      assertEquals(3, tableRegions.size());
-      LOG.info("Making sure we can call getTableRegionClosest while opening");
-      Pair<HRegionInfo, ServerName> pair =
+    LOG.info("Regions: " + Joiner.on(',').join(tableRegions));
+    // We have three regions because one is split-in-progress
+    assertEquals(3, tableRegions.size());
+    LOG.info("Making sure we can call getTableRegionClosest while opening");
+    Pair<HRegionInfo, ServerName> pair =
         m.getTableRegionForRow(TABLENAME, Bytes.toBytes("cde"));
-      LOG.info("Result is: " + pair);
-      Pair<HRegionInfo, ServerName> tableRegionFromName =
+    LOG.info("Result is: " + pair);
+    Pair<HRegionInfo, ServerName> tableRegionFromName =
         MetaReader.getRegion(m.getCatalogTracker(),
             pair.getFirst().getRegionName());
-      assertEquals(tableRegionFromName.getFirst(), pair.getFirst());
-    } finally {
-      proceed.countDown();
-    }
+    assertEquals(tableRegionFromName.getFirst(), pair.getFirst());
   }
 
   @Test
@@ -175,33 +162,5 @@ public class TestMaster {
       TEST_UTIL.deleteTable(tableName);
     }
   }
-
-  static class RegionSplitListener implements EventHandlerListener {
-    CountDownLatch split, proceed;
-
-    public RegionSplitListener(CountDownLatch split, CountDownLatch proceed) {
-      this.split = split;
-      this.proceed = proceed;
-    }
-
-    @Override
-    public void afterProcess(EventHandler event) {
-      if (event.getEventType() != EventType.RS_ZK_REGION_SPLIT) {
-        return;
-      }
-      try {
-        split.countDown();
-        proceed.await(60, TimeUnit.SECONDS);
-      } catch (InterruptedException ie) {
-        throw new RuntimeException(ie);
-      }
-      return;
-    }
-
-    @Override
-    public void beforeProcess(EventHandler event) {
-    }
-  }
-
 }
 

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java?rev=1510800&r1=1510799&r2=1510800&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
Mon Aug  5 23:10:37 2013
@@ -26,7 +26,6 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,10 +42,6 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.executor.EventHandler;
-import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
-import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.master.handler.TotesHRegionInfo;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -116,29 +111,14 @@ public class TestZKBasedOpenCloseRegion 
     HRegionInfo hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(regionServer));
     LOG.debug("Asking RS to close region " + hri.getRegionNameAsString());
 
-    AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
-    AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
-
-    EventHandlerListener closeListener =
-      new ReopenEventListener(hri.getRegionNameAsString(),
-          closeEventProcessed, EventType.RS_ZK_REGION_CLOSED);
-    cluster.getMaster().executorService.
-      registerListener(EventType.RS_ZK_REGION_CLOSED, closeListener);
-
-    EventHandlerListener openListener =
-      new ReopenEventListener(hri.getRegionNameAsString(),
-          reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
-    cluster.getMaster().executorService.
-      registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
-
     LOG.info("Unassign " + hri.getRegionNameAsString());
     cluster.getMaster().assignmentManager.unassign(hri);
 
-    while (!closeEventProcessed.get()) {
+    while (!cluster.getMaster().assignmentManager.wasClosedHandlerCalled(hri)) {
       Threads.sleep(100);
     }
 
-    while (!reopenEventProcessed.get()) {
+    while (!cluster.getMaster().assignmentManager.wasOpenedHandlerCalled(hri)) {
       Threads.sleep(100);
     }
 
@@ -157,83 +137,6 @@ public class TestZKBasedOpenCloseRegion 
     return hri;
   }
 
-  public static class ReopenEventListener implements EventHandlerListener {
-    private static final Log LOG = LogFactory.getLog(ReopenEventListener.class);
-    String regionName;
-    AtomicBoolean eventProcessed;
-    EventType eventType;
-
-    public ReopenEventListener(String regionName,
-        AtomicBoolean eventProcessed, EventType eventType) {
-      this.regionName = regionName;
-      this.eventProcessed = eventProcessed;
-      this.eventType = eventType;
-    }
-
-    @Override
-    public void beforeProcess(EventHandler event) {
-      if(event.getEventType() == eventType) {
-        LOG.info("Received " + eventType + " and beginning to process it");
-      }
-    }
-
-    @Override
-    public void afterProcess(EventHandler event) {
-      LOG.info("afterProcess(" + event + ")");
-      if(event.getEventType() == eventType) {
-        LOG.info("Finished processing " + eventType);
-        String regionName = "";
-        if(eventType == EventType.RS_ZK_REGION_OPENED) {
-          TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
-          regionName = hriCarrier.getHRegionInfo().getRegionNameAsString();
-        } else if(eventType == EventType.RS_ZK_REGION_CLOSED) {
-          TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
-          regionName = hriCarrier.getHRegionInfo().getRegionNameAsString();
-        }
-        if(this.regionName.equals(regionName)) {
-          eventProcessed.set(true);
-        }
-        synchronized(eventProcessed) {
-          eventProcessed.notifyAll();
-        }
-      }
-    }
-  }
-
-  public static class CloseRegionEventListener implements EventHandlerListener {
-    private static final Log LOG = LogFactory.getLog(CloseRegionEventListener.class);
-    String regionToClose;
-    AtomicBoolean closeEventProcessed;
-
-    public CloseRegionEventListener(String regionToClose,
-        AtomicBoolean closeEventProcessed) {
-      this.regionToClose = regionToClose;
-      this.closeEventProcessed = closeEventProcessed;
-    }
-
-    @Override
-    public void afterProcess(EventHandler event) {
-      LOG.info("afterProcess(" + event + ")");
-      if(event.getEventType() == EventType.RS_ZK_REGION_CLOSED) {
-        LOG.info("Finished processing CLOSE REGION");
-        TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
-        if (regionToClose.equals(hriCarrier.getHRegionInfo().getRegionNameAsString())) {
-          LOG.info("Setting closeEventProcessed flag");
-          closeEventProcessed.set(true);
-        } else {
-          LOG.info("Region to close didn't match");
-        }
-      }
-    }
-
-    @Override
-    public void beforeProcess(EventHandler event) {
-      if(event.getEventType() == EventType.M_RS_CLOSE_REGION) {
-        LOG.info("Received CLOSE RPC and beginning to process it");
-      }
-    }
-  }
-
   /**
    * This test shows how a region won't be able to be assigned to a RS
    * if it's already "processing" it.
@@ -253,13 +156,6 @@ public class TestZKBasedOpenCloseRegion 
     // fake that hr1 is processing the region
     hr1.getRegionsInTransitionInRS().putIfAbsent(hri.getEncodedNameAsBytes(), true);
 
-    AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
-    EventHandlerListener openListener =
-      new ReopenEventListener(hri.getRegionNameAsString(),
-          reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
-    cluster.getMaster().executorService.
-      registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
-
     // now ask the master to move the region to hr1, will fail
     TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
         Bytes.toBytes(hr1.getServerName().toString()));
@@ -269,22 +165,14 @@ public class TestZKBasedOpenCloseRegion 
 
     // remove the block and reset the boolean
     hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes());
-    reopenEventProcessed.set(false);
 
     // now try moving a region when there is no region in transition.
     hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(hr1));
 
-    openListener =
-      new ReopenEventListener(hri.getRegionNameAsString(),
-          reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
-
-    cluster.getMaster().executorService.
-      registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
-
     TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
         Bytes.toBytes(hr0.getServerName().toString()));
 
-    while (!reopenEventProcessed.get()) {
+    while (!cluster.getMaster().assignmentManager.wasOpenedHandlerCalled(hri)) {
       Threads.sleep(100);
     }
 
@@ -304,15 +192,9 @@ public class TestZKBasedOpenCloseRegion 
     HRegionInfo hri = getNonMetaRegion(ProtobufUtil.getOnlineRegions(regionServer));
     LOG.debug("Asking RS to close region " + hri.getRegionNameAsString());
 
-    AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
-    EventHandlerListener listener =
-      new CloseRegionEventListener(hri.getRegionNameAsString(),
-          closeEventProcessed);
-    cluster.getMaster().executorService.registerListener(EventType.RS_ZK_REGION_CLOSED, listener);
-
     cluster.getMaster().assignmentManager.unassign(hri);
 
-    while (!closeEventProcessed.get()) {
+    while (!cluster.getMaster().assignmentManager.wasClosedHandlerCalled(hri)) {
       Threads.sleep(100);
     }
     LOG.info("Done with testCloseRegion");



Mime
View raw message