Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A0EB39C9C for ; Mon, 23 Apr 2012 18:12:46 +0000 (UTC) Received: (qmail 55421 invoked by uid 500); 23 Apr 2012 18:12:46 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 55315 invoked by uid 500); 23 Apr 2012 18:12:46 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 55286 invoked by uid 99); 23 Apr 2012 18:12:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Apr 2012 18:12:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Apr 2012 18:12:40 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6ACF723889EC for ; Mon, 23 Apr 2012 18:12:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1329358 [4/5] - in /hbase/trunk: security/src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/... Date: Mon, 23 Apr 2012 18:12:18 -0000 To: commits@hbase.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120423181220.6ACF723889EC@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Apr 23 18:12:16 2012 @@ -74,7 +74,6 @@ import org.apache.hadoop.hbase.NotServin import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.YouAreDeadException; @@ -82,7 +81,9 @@ import org.apache.hadoop.hbase.catalog.C import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Action; +import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -117,11 +118,7 @@ import org.apache.hadoop.hbase.ipc.Invoc import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; -import org.apache.hadoop.hbase.protobuf.ClientProtocol; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; -import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; -import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; -import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler; @@ -185,12 +182,6 @@ public class HRegionServer extends Regio private boolean useHBaseChecksum; // verify hbase checksums? private Path rootDir; - //RegionName vs current action in progress - //true - if open region action in progress - //false - if close region action in progress - private final ConcurrentSkipListMap regionsInTransitionInRS = - new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); - protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final int numRetries; @@ -228,9 +219,6 @@ public class HRegionServer extends Regio @SuppressWarnings("unused") private RegionServerDynamicMetrics dynamicMetrics; - // Compactions - public CompactSplitThread compactSplitThread; - /* * Check for compactions requests. */ @@ -250,9 +238,6 @@ public class HRegionServer extends Regio // master address manager and watcher private MasterAddressTracker masterAddressManager; - // catalog tracker - private CatalogTracker catalogTracker; - // Cluster Status Tracker private ClusterStatusTracker clusterStatusTracker; @@ -264,14 +249,6 @@ public class HRegionServer extends Regio private final int rpcTimeout; - // Instance of the hbase executor service. - private ExecutorService service; - @SuppressWarnings("unused") - - // Replication services. If no replication, this handler will be null. - private ReplicationSourceService replicationSourceHandler; - private ReplicationSinkService replicationSinkHandler; - private final RegionServerAccounting regionServerAccounting; // Cache configuration and block cache reference @@ -297,18 +274,6 @@ public class HRegionServer extends Regio private final long startcode; /** - * Go here to get table descriptors. - */ - private TableDescriptors tableDescriptors; - - /* - * Strings to be used in forming the exception message for - * RegionsAlreadyInTransitionException. - */ - private static final String OPEN = "OPEN"; - private static final String CLOSE = "CLOSE"; - - /** * MX Bean for RegionServerInfo */ private ObjectName mxBean = null; @@ -370,7 +335,7 @@ public class HRegionServer extends Regio this.rpcServer = HBaseRPC.getServer(this, new Class[]{HRegionInterface.class, ClientProtocol.class, - HBaseRPCErrorHandler.class, + AdminProtocol.class, HBaseRPCErrorHandler.class, OnlineRegions.class}, initialIsa.getHostName(), // BindAddress is IP we got for this server. initialIsa.getPort(), @@ -2490,19 +2455,6 @@ public class HRegionServer extends Regio return RegionOpeningState.OPENED; } - private void checkIfRegionInTransition(HRegionInfo region, - String currentAction) throws RegionAlreadyInTransitionException { - byte[] encodedName = region.getEncodedNameAsBytes(); - if (this.regionsInTransitionInRS.containsKey(encodedName)) { - boolean openAction = this.regionsInTransitionInRS.get(encodedName); - // The below exception message will be used in master. - throw new RegionAlreadyInTransitionException("Received:" + currentAction + - " for the region:" + region.getRegionNameAsString() + - " ,which we are already trying to " + - (openAction ? OPEN : CLOSE)+ "."); - } - } - @Override @QosPriority(priority=HIGH_QOS) public void openRegions(List regions) @@ -2560,54 +2512,6 @@ public class HRegionServer extends Regio } /** - * @param region Region to close - * @param abort True if we are aborting - * @param zk True if we are to update zk about the region close; if the close - * was orchestrated by master, then update zk. If the close is being run by - * the regionserver because its going down, don't update zk. - * @return True if closed a region. - */ - protected boolean closeRegion(HRegionInfo region, final boolean abort, - final boolean zk) { - return closeRegion(region, abort, zk, -1); - } - - - /** - * @param region Region to close - * @param abort True if we are aborting - * @param zk True if we are to update zk about the region close; if the close - * was orchestrated by master, then update zk. If the close is being run by - * the regionserver because its going down, don't update zk. - * @param versionOfClosingNode - * the version of znode to compare when RS transitions the znode from - * CLOSING state. - * @return True if closed a region. - */ - protected boolean closeRegion(HRegionInfo region, final boolean abort, - final boolean zk, final int versionOfClosingNode) { - if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) { - LOG.warn("Received close for region we are already opening or closing; " + - region.getEncodedName()); - return false; - } - this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), false); - CloseRegionHandler crh = null; - if (region.isRootRegion()) { - crh = new CloseRootHandler(this, this, region, abort, zk, - versionOfClosingNode); - } else if (region.isMetaRegion()) { - crh = new CloseMetaHandler(this, this, region, abort, zk, - versionOfClosingNode); - } else { - crh = new CloseRegionHandler(this, this, region, abort, zk, - versionOfClosingNode); - } - this.service.submit(crh); - return true; - } - - /** * @param encodedRegionName * encodedregionName to close * @param abort @@ -2804,13 +2708,6 @@ public class HRegionServer extends Regio return sortedRegions; } - @Override - public HRegion getFromOnlineRegions(final String encodedRegionName) { - HRegion r = null; - r = this.onlineRegions.get(encodedRegionName); - return r; - } - /** @return the request count */ public AtomicInteger getRequestCount() { return this.requestCount; @@ -2858,6 +2755,8 @@ public class HRegionServer extends Regio return new ProtocolSignature(HRegionInterface.VERSION, null); } else if (protocol.equals(ClientProtocol.class.getName())) { return new ProtocolSignature(ClientProtocol.VERSION, null); + } else if (protocol.equals(AdminProtocol.class.getName())) { + return new ProtocolSignature(AdminProtocol.VERSION, null); } throw new IOException("Unknown protocol: " + protocol); } @@ -2870,6 +2769,8 @@ public class HRegionServer extends Regio return HRegionInterface.VERSION; } else if (protocol.equals(ClientProtocol.class.getName())) { return ClientProtocol.VERSION; + } else if (protocol.equals(AdminProtocol.class.getName())) { + return AdminProtocol.VERSION; } throw new IOException("Unknown protocol: " + protocol); } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Mon Apr 23 18:12:16 2012 @@ -37,16 +37,11 @@ import org.apache.hadoop.hbase.client.Ge import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.thrift.ThriftServerRunner; import org.apache.hadoop.hbase.thrift.ThriftUtilities; import org.apache.hadoop.hbase.thrift.generated.IOError; import org.apache.hadoop.hbase.thrift.generated.TRowResult; -import com.google.protobuf.ServiceException; - /** * HRegionThriftServer - this class starts up a Thrift server in the same * JVM where the RegionServer is running. It inherits most of the @@ -136,10 +131,7 @@ public class HRegionThriftServer extends if (columns == null) { Get get = new Get(row); get.setTimeRange(Long.MIN_VALUE, timestamp); - GetRequest request = - RequestConverter.buildGetRequest(regionName, get); - GetResponse response = rs.get(null, request); - Result result = ProtobufUtil.toResult(response.getResult()); + Result result = ProtobufUtil.get(rs, regionName, get); return ThriftUtilities.rowResultFromHBase(result); } Get get = new Get(row); @@ -152,10 +144,7 @@ public class HRegionThriftServer extends } } get.setTimeRange(Long.MIN_VALUE, timestamp); - GetRequest request = - RequestConverter.buildGetRequest(regionName, get); - GetResponse response = rs.get(null, request); - Result result = ProtobufUtil.toResult(response.getResult()); + Result result = ProtobufUtil.get(rs, regionName, get); return ThriftUtilities.rowResultFromHBase(result); } catch (NotServingRegionException e) { if (!redirect) { @@ -165,10 +154,6 @@ public class HRegionThriftServer extends LOG.debug("ThriftServer redirecting getRowWithColumnsTs"); return super.getRowWithColumnsTs(tableName, rowb, columns, timestamp, attributes); - } catch (ServiceException se) { - IOException e = ProtobufUtil.getRemoteException(se); - LOG.warn(e.getMessage(), e); - throw new IOError(e.getMessage()); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(e.getMessage()); Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java Mon Apr 23 18:12:16 2012 @@ -21,10 +21,14 @@ package org.apache.hadoop.hbase.regionse import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -33,12 +37,19 @@ import org.apache.hadoop.classification. import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -48,13 +59,38 @@ import org.apache.hadoop.hbase.client.Ro import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.client.coprocessor.ExecResult; +import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; -import org.apache.hadoop.hbase.protobuf.ClientProtocol; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; @@ -78,15 +114,24 @@ import org.apache.hadoop.hbase.protobuf. import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.regionserver.HRegionServer.QosPriority; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; +import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; +import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler; +import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; +import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; +import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; +import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; +import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -101,17 +146,45 @@ import com.google.protobuf.ServiceExcept */ @InterfaceAudience.Private public abstract class RegionServer implements - ClientProtocol, Runnable, RegionServerServices { + ClientProtocol, AdminProtocol, Runnable, RegionServerServices { private static final Log LOG = LogFactory.getLog(RegionServer.class); private final Random rand = new Random(); + /* + * Strings to be used in forming the exception message for + * RegionsAlreadyInTransitionException. + */ + protected static final String OPEN = "OPEN"; + protected static final String CLOSE = "CLOSE"; + + //RegionName vs current action in progress + //true - if open region action in progress + //false - if close region action in progress + protected final ConcurrentSkipListMap regionsInTransitionInRS = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + protected long maxScannerResultSize; // Cache flushing protected MemStoreFlusher cacheFlusher; + // catalog tracker + protected CatalogTracker catalogTracker; + + /** + * Go here to get table descriptors. + */ + protected TableDescriptors tableDescriptors; + + // Replication services. If no replication, this handler will be null. + protected ReplicationSourceService replicationSourceHandler; + protected ReplicationSinkService replicationSinkHandler; + + // Compactions + public CompactSplitThread compactSplitThread; + final Map scanners = new ConcurrentHashMap(); @@ -125,6 +198,9 @@ public abstract class RegionServer imple // Leases protected Leases leases; + // Instance of the hbase executor service. + protected ExecutorService service; + // Request counter. // Do we need this? Can't we just sum region counters? St.Ack 20110412 protected AtomicInteger requestCount = new AtomicInteger(); @@ -244,6 +320,67 @@ public abstract class RegionServer imple } } + protected void checkIfRegionInTransition(HRegionInfo region, + String currentAction) throws RegionAlreadyInTransitionException { + byte[] encodedName = region.getEncodedNameAsBytes(); + if (this.regionsInTransitionInRS.containsKey(encodedName)) { + boolean openAction = this.regionsInTransitionInRS.get(encodedName); + // The below exception message will be used in master. + throw new RegionAlreadyInTransitionException("Received:" + currentAction + + " for the region:" + region.getRegionNameAsString() + + " ,which we are already trying to " + + (openAction ? OPEN : CLOSE)+ "."); + } + } + + /** + * @param region Region to close + * @param abort True if we are aborting + * @param zk True if we are to update zk about the region close; if the close + * was orchestrated by master, then update zk. If the close is being run by + * the regionserver because its going down, don't update zk. + * @return True if closed a region. + */ + protected boolean closeRegion(HRegionInfo region, final boolean abort, + final boolean zk) { + return closeRegion(region, abort, zk, -1); + } + + + /** + * @param region Region to close + * @param abort True if we are aborting + * @param zk True if we are to update zk about the region close; if the close + * was orchestrated by master, then update zk. If the close is being run by + * the regionserver because its going down, don't update zk. + * @param versionOfClosingNode + * the version of znode to compare when RS transitions the znode from + * CLOSING state. + * @return True if closed a region. + */ + protected boolean closeRegion(HRegionInfo region, final boolean abort, + final boolean zk, final int versionOfClosingNode) { + if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) { + LOG.warn("Received close for region we are already opening or closing; " + + region.getEncodedName()); + return false; + } + this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), false); + CloseRegionHandler crh = null; + if (region.isRootRegion()) { + crh = new CloseRootHandler(this, this, region, abort, zk, + versionOfClosingNode); + } else if (region.isMetaRegion()) { + crh = new CloseMetaHandler(this, this, region, abort, zk, + versionOfClosingNode); + } else { + crh = new CloseRegionHandler(this, this, region, abort, zk, + versionOfClosingNode); + } + this.service.submit(crh); + return true; + } + /** * @param regionName * @return HRegion for the passed binary regionName or null if @@ -254,6 +391,11 @@ public abstract class RegionServer imple return this.onlineRegions.get(encodedRegionName); } + @Override + public HRegion getFromOnlineRegions(final String encodedRegionName) { + return this.onlineRegions.get(encodedRegionName); + } + /** * Protected utility method for safely obtaining an HRegion handle. * @@ -1002,6 +1144,352 @@ public abstract class RegionServer imple } // End Client methods +// Start Admin methods + + @Override + @QosPriority(priority=HIGH_QOS) + public GetRegionInfoResponse getRegionInfo(final RpcController controller, + final GetRegionInfoRequest request) throws ServiceException { + try { + checkOpen(); + requestCount.incrementAndGet(); + HRegion region = getRegion(request.getRegion()); + HRegionInfo info = region.getRegionInfo(); + GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); + builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); + return builder.build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + @Override + public GetStoreFileResponse getStoreFile(final RpcController controller, + final GetStoreFileRequest request) throws ServiceException { + try { + HRegion region = getRegion(request.getRegion()); + requestCount.incrementAndGet(); + Set columnFamilies = null; + if (request.getFamilyCount() == 0) { + columnFamilies = region.getStores().keySet(); + } else { + columnFamilies = new HashSet(); + for (ByteString cf: request.getFamilyList()) { + columnFamilies.add(cf.toByteArray()); + } + } + int nCF = columnFamilies.size(); + List fileList = region.getStoreFileList( + columnFamilies.toArray(new byte[nCF][])); + GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder(); + builder.addAllStoreFile(fileList); + return builder.build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + @Override + @QosPriority(priority=HIGH_QOS) + public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, + final GetOnlineRegionRequest request) throws ServiceException { + try { + checkOpen(); + requestCount.incrementAndGet(); + List list = new ArrayList(onlineRegions.size()); + for (Map.Entry e: this.onlineRegions.entrySet()) { + list.add(e.getValue().getRegionInfo()); + } + Collections.sort(list); + GetOnlineRegionResponse.Builder builder = GetOnlineRegionResponse.newBuilder(); + for (HRegionInfo region: list) { + builder.addRegionInfo(ProtobufUtil.toRegionInfo(region)); + } + return builder.build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + + // Region open/close direct RPCs + + /** + * Open a region on the region server. + * + * @param controller the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + @QosPriority(priority=HIGH_QOS) + public OpenRegionResponse openRegion(final RpcController controller, + final OpenRegionRequest request) throws ServiceException { + int versionOfOfflineNode = -1; + if (request.hasVersionOfOfflineNode()) { + versionOfOfflineNode = request.getVersionOfOfflineNode(); + } + try { + checkOpen(); + requestCount.incrementAndGet(); + OpenRegionResponse.Builder + builder = OpenRegionResponse.newBuilder(); + for (RegionInfo regionInfo: request.getRegionList()) { + HRegionInfo region = ProtobufUtil.toRegionInfo(regionInfo); + checkIfRegionInTransition(region, OPEN); + + HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName()); + if (null != onlineRegion) { + // See HBASE-5094. Cross check with META if still this RS is owning the + // region. + Pair p = MetaReader.getRegion( + this.catalogTracker, region.getRegionName()); + if (this.getServerName().equals(p.getSecond())) { + LOG.warn("Attempted open of " + region.getEncodedName() + + " but already online on this server"); + builder.addOpeningState(RegionOpeningState.ALREADY_OPENED); + continue; + } else { + LOG.warn("The region " + region.getEncodedName() + + " is online on this server but META does not have this server."); + removeFromOnlineRegions(region.getEncodedName()); + } + } + LOG.info("Received request to open region: " + region.getEncodedName()); + this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), true); + HTableDescriptor htd = this.tableDescriptors.get(region.getTableName()); + // Need to pass the expected version in the constructor. + if (region.isRootRegion()) { + this.service.submit(new OpenRootHandler(this, this, region, htd, + versionOfOfflineNode)); + } else if (region.isMetaRegion()) { + this.service.submit(new OpenMetaHandler(this, this, region, htd, + versionOfOfflineNode)); + } else { + this.service.submit(new OpenRegionHandler(this, this, region, htd, + versionOfOfflineNode)); + } + builder.addOpeningState(RegionOpeningState.OPENED); + } + return builder.build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + /** + * Close a region on the region server. + * + * @param controller the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + @QosPriority(priority=HIGH_QOS) + public CloseRegionResponse closeRegion(final RpcController controller, + final CloseRegionRequest request) throws ServiceException { + int versionOfClosingNode = -1; + if (request.hasVersionOfClosingNode()) { + versionOfClosingNode = request.getVersionOfClosingNode(); + } + boolean zk = request.getTransitionInZK(); + try { + checkOpen(); + requestCount.incrementAndGet(); + HRegion region = getRegion(request.getRegion()); + CloseRegionResponse.Builder + builder = CloseRegionResponse.newBuilder(); + LOG.info("Received close region: " + region.getRegionNameAsString() + + ". Version of ZK closing node:" + versionOfClosingNode); + HRegionInfo regionInfo = region.getRegionInfo(); + checkIfRegionInTransition(regionInfo, CLOSE); + boolean closed = closeRegion( + regionInfo, false, zk, versionOfClosingNode); + builder.setClosed(closed); + return builder.build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + /** + * Flush a region on the region server. + * + * @param controller the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + @QosPriority(priority=HIGH_QOS) + public FlushRegionResponse flushRegion(final RpcController controller, + final FlushRegionRequest request) throws ServiceException { + try { + checkOpen(); + requestCount.incrementAndGet(); + HRegion region = getRegion(request.getRegion()); + LOG.info("Flushing " + region.getRegionNameAsString()); + boolean shouldFlush = true; + if (request.hasIfOlderThanTs()) { + shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs(); + } + FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); + if (shouldFlush) { + builder.setFlushed(region.flushcache()); + } + builder.setLastFlushTime(region.getLastFlushTime()); + return builder.build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + /** + * Split a region on the region server. + * + * @param controller the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + @QosPriority(priority=HIGH_QOS) + public SplitRegionResponse splitRegion(final RpcController controller, + final SplitRegionRequest request) throws ServiceException { + try { + checkOpen(); + requestCount.incrementAndGet(); + HRegion region = getRegion(request.getRegion()); + LOG.info("Splitting " + region.getRegionNameAsString()); + region.flushcache(); + byte[] splitPoint = null; + if (request.hasSplitPoint()) { + splitPoint = request.getSplitPoint().toByteArray(); + } + region.forceSplit(splitPoint); + compactSplitThread.requestSplit(region, region.checkSplit()); + return SplitRegionResponse.newBuilder().build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + /** + * Compact a region on the region server. + * + * @param controller the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + @QosPriority(priority=HIGH_QOS) + public CompactRegionResponse compactRegion(final RpcController controller, + final CompactRegionRequest request) throws ServiceException { + try { + checkOpen(); + requestCount.incrementAndGet(); + HRegion region = getRegion(request.getRegion()); + LOG.info("Compacting " + region.getRegionNameAsString()); + boolean major = false; + if (request.hasMajor()) { + major = request.getMajor(); + } + if (major) { + region.triggerMajorCompaction(); + } + compactSplitThread.requestCompaction(region, + "User-triggered " + (major ? "major " : "") + "compaction", + CompactSplitThread.PRIORITY_USER); + return CompactRegionResponse.newBuilder().build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + /** + * Replicate WAL entries on the region server. + * + * @param controller the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + @QosPriority(priority=HIGH_QOS) + public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, + final ReplicateWALEntryRequest request) throws ServiceException { + try { + if (replicationSinkHandler != null) { + checkOpen(); + requestCount.incrementAndGet(); + HLog.Entry[] entries = ProtobufUtil.toHLogEntries(request.getEntryList()); + if (entries != null && entries.length > 0) { + replicationSinkHandler.replicateLogEntries(entries); + } + } + return ReplicateWALEntryResponse.newBuilder().build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + /** + * Roll the WAL writer of the region server. + * + * @param controller the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + public RollWALWriterResponse rollWALWriter(final RpcController controller, + final RollWALWriterRequest request) throws ServiceException { + try { + requestCount.incrementAndGet(); + HLog wal = this.getWAL(); + byte[][] regionsToFlush = wal.rollWriter(true); + RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder(); + if (regionsToFlush != null) { + for (byte[] region: regionsToFlush) { + builder.addRegionToFlush(ByteString.copyFrom(region)); + } + } + return builder.build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + /** + * Stop the region server. + * + * @param controller the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + public StopServerResponse stopServer(final RpcController controller, + final StopServerRequest request) throws ServiceException { + requestCount.incrementAndGet(); + String reason = request.getReason(); + stop(reason); + return StopServerResponse.newBuilder().build(); + } + + /** + * Get some information of the region server. + * + * @param controller the RPC controller + * @param request the request + * @throws ServiceException + */ + @Override + public GetServerInfoResponse getServerInfo(final RpcController controller, + final GetServerInfoRequest request) throws ServiceException { + ServerName serverName = getServerName(); + requestCount.incrementAndGet(); + GetServerInfoResponse.Builder builder = GetServerInfoResponse.newBuilder(); + builder.setServerName(ProtobufUtil.toServerName(serverName)); + return builder.build(); + } + +// End Admin methods /** * Find the HRegion based on a region specifier Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Mon Apr 23 18:12:16 2012 @@ -48,9 +48,10 @@ import org.apache.hadoop.hbase.HConstant import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -605,9 +606,10 @@ public class ReplicationSource extends T continue; } try { - HRegionInterface rrs = getRS(); + AdminProtocol rrs = getRS(); LOG.debug("Replicating " + currentNbEntries); - rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries)); + ProtobufUtil.replicateWALEntry(rrs, + Arrays.copyOf(this.entriesArray, currentNbEntries)); if (this.lastLoggedPosition != this.position) { this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.position, queueRecovered); @@ -727,13 +729,13 @@ public class ReplicationSource extends T * @return * @throws IOException */ - private HRegionInterface getRS() throws IOException { + private AdminProtocol getRS() throws IOException { if (this.currentPeers.size() == 0) { throw new IOException(this.peerClusterZnode + " has 0 region servers"); } ServerName address = currentPeers.get(random.nextInt(this.currentPeers.size())); - return this.conn.getHRegionConnection(address.getHostname(), address.getPort()); + return this.conn.getAdmin(address.getHostname(), address.getPort()); } /** @@ -746,9 +748,9 @@ public class ReplicationSource extends T Thread pingThread = new Thread() { public void run() { try { - HRegionInterface rrs = getRS(); + AdminProtocol rrs = getRS(); // Dummy call which should fail - rrs.getHServerInfo(); + ProtobufUtil.getServerInfo(rrs); latch.countDown(); } catch (IOException ex) { if (ex instanceof RemoteException) { Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Mon Apr 23 18:12:16 2012 @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.MasterNot import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -70,8 +71,8 @@ import org.apache.hadoop.hbase.client.Pu import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE; @@ -2620,11 +2621,11 @@ public class HBaseFsck { public synchronized void run() { errors.progress(); try { - HRegionInterface server = - connection.getHRegionConnection(rsinfo.getHostname(), rsinfo.getPort()); + AdminProtocol server = + connection.getAdmin(rsinfo.getHostname(), rsinfo.getPort()); // list all online regions from this region server - List regions = server.getOnlineRegions(); + List regions = ProtobufUtil.getOnlineRegions(server); if (hbck.checkMetaOnly) { regions = filterOnlyMetaRegions(regions); } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java Mon Apr 23 18:12:16 2012 @@ -34,12 +34,13 @@ import org.apache.hadoop.hbase.HRegionIn import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.master.AssignmentManager.RegionState; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.zookeeper.KeeperException; @@ -149,17 +150,16 @@ public class HBaseFsckRepair { public static void closeRegionSilentlyAndWait(HBaseAdmin admin, ServerName server, HRegionInfo region) throws IOException, InterruptedException { HConnection connection = admin.getConnection(); - HRegionInterface rs = connection.getHRegionConnection(server.getHostname(), - server.getPort()); - rs.closeRegion(region, false); + AdminProtocol rs = connection.getAdmin(server.getHostname(), server.getPort()); + ProtobufUtil.closeRegion(rs, region.getRegionName(), false); long timeout = admin.getConfiguration() .getLong("hbase.hbck.close.timeout", 120000); long expiration = timeout + System.currentTimeMillis(); while (System.currentTimeMillis() < expiration) { try { - HRegionInfo rsRegion = rs.getRegionInfo(region.getRegionName()); - if (rsRegion == null) - return; + HRegionInfo rsRegion = + ProtobufUtil.getRegionInfo(rs, region.getRegionName()); + if (rsRegion == null) return; } catch (IOException ioe) { return; } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/SortedCopyOnWriteSet.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/SortedCopyOnWriteSet.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/SortedCopyOnWriteSet.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/SortedCopyOnWriteSet.java Mon Apr 23 18:12:16 2012 @@ -49,7 +49,7 @@ import org.apache.hadoop.classification. @InterfaceAudience.Public @InterfaceStability.Stable public class SortedCopyOnWriteSet implements SortedSet { - private SortedSet internalSet; + private volatile SortedSet internalSet; public SortedCopyOnWriteSet() { this.internalSet = new TreeSet(); Modified: hbase/trunk/src/main/protobuf/Admin.proto URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/protobuf/Admin.proto?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/main/protobuf/Admin.proto (original) +++ hbase/trunk/src/main/protobuf/Admin.proto Mon Apr 23 18:12:16 2012 @@ -38,12 +38,12 @@ message GetRegionInfoResponse { * Get a list of store files for a set of column families in a particular region. * If no column family is specified, get the store files for all column families. */ -message GetStoreFileListRequest { +message GetStoreFileRequest { required RegionSpecifier region = 1; - repeated bytes columnFamily = 2; + repeated bytes family = 2; } -message GetStoreFileListResponse { +message GetStoreFileResponse { repeated string storeFile = 1; } @@ -55,7 +55,7 @@ message GetOnlineRegionResponse { } message OpenRegionRequest { - repeated RegionSpecifier region = 1; + repeated RegionInfo region = 1; optional uint32 versionOfOfflineNode = 2; } @@ -133,7 +133,7 @@ message UUID { // Protocol buffer version of HLog message WALEntry { - required WALKey walKey = 1; + required WALKey key = 1; required WALEdit edit = 2; // Protocol buffer version of HLogKey @@ -146,7 +146,7 @@ message WALEntry { } message WALEdit { - repeated bytes keyValue = 1; + repeated bytes keyValueBytes = 1; repeated FamilyScope familyScope = 2; enum ScopeType { @@ -168,7 +168,7 @@ message WALEntry { * hbase.replication has to be set to true for this to work. */ message ReplicateWALEntryRequest { - repeated WALEntry walEntry = 1; + repeated WALEntry entry = 1; } message ReplicateWALEntryResponse { @@ -201,8 +201,8 @@ service AdminService { rpc getRegionInfo(GetRegionInfoRequest) returns(GetRegionInfoResponse); - rpc getStoreFileList(GetStoreFileListRequest) - returns(GetStoreFileListResponse); + rpc getStoreFile(GetStoreFileRequest) + returns(GetStoreFileResponse); rpc getOnlineRegion(GetOnlineRegionRequest) returns(GetOnlineRegionResponse); Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java Mon Apr 23 18:12:16 2012 @@ -32,8 +32,18 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AdminProtocol; +import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; @@ -41,7 +51,9 @@ import org.apache.hadoop.hbase.client.Re import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.ServerCallable; import org.apache.hadoop.hbase.ipc.HRegionInterface; -import org.apache.hadoop.hbase.protobuf.ClientProtocol; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.util.Bytes; @@ -186,16 +198,19 @@ public class TestCatalogTracker { @Test public void testServerNotRunningIOException() throws IOException, InterruptedException, KeeperException, ServiceException { - // Mock an HRegionInterface. - final HRegionInterface implementation = Mockito.mock(HRegionInterface.class); + // Mock an Admin and a Client. + final AdminProtocol admin = Mockito.mock(AdminProtocol.class); final ClientProtocol client = Mockito.mock(ClientProtocol.class); - HConnection connection = mockConnection(implementation, client); + HConnection connection = mockConnection(admin, client); try { - // If a 'getRegionInfo' is called on mocked HRegionInterface, throw IOE + // If a 'getRegionInfo' is called on mocked AdminProtocol, throw IOE // the first time. 'Succeed' the second time we are called. - Mockito.when(implementation.getRegionInfo((byte[]) Mockito.any())). - thenThrow(new IOException("Server not running, aborting")). - thenReturn(new HRegionInfo()); + GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); + builder.setRegionInfo(ProtobufUtil.toRegionInfo(new HRegionInfo(Bytes.toBytes("test")))); + Mockito.when(admin.getRegionInfo((RpcController)Mockito.any(), + (GetRegionInfoRequest)Mockito.any())).thenThrow( + new ServiceException(new IOException("Server not running, aborting"))). + thenReturn(builder.build()); // After we encounter the above 'Server not running', we should catch the // IOE and go into retrying for the meta mode. We'll do gets on -ROOT- to @@ -292,18 +307,19 @@ public class TestCatalogTracker { * @throws IOException * @throws InterruptedException * @throws KeeperException + * @throws ServiceException */ @Test public void testVerifyRootRegionLocationFails() - throws IOException, InterruptedException, KeeperException { + throws IOException, InterruptedException, KeeperException, ServiceException { HConnection connection = Mockito.mock(HConnection.class); - ConnectException connectException = - new ConnectException("Connection refused"); - final HRegionInterface implementation = - Mockito.mock(HRegionInterface.class); - Mockito.when(implementation.getRegionInfo((byte [])Mockito.any())). - thenThrow(connectException); - Mockito.when(connection.getHRegionConnection(Mockito.anyString(), + ServiceException connectException = + new ServiceException(new ConnectException("Connection refused")); + final AdminProtocol implementation = + Mockito.mock(AdminProtocol.class); + Mockito.when(implementation.getRegionInfo((RpcController)Mockito.any(), + (GetRegionInfoRequest)Mockito.any())).thenThrow(connectException); + Mockito.when(connection.getAdmin(Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())). thenReturn(implementation); final CatalogTracker ct = constructAndStartCatalogTracker(connection); @@ -379,11 +395,11 @@ public class TestCatalogTracker { // that ... and so one. @Test public void testNoTimeoutWaitForMeta() throws Exception { - // Mock an HConnection and a HRegionInterface implementation. Have the + // Mock an HConnection and a AdminProtocol implementation. Have the // HConnection return the HRI. Have the HRI return a few mocked up responses // to make our test work. - // Mock an HRegionInterface. - final HRegionInterface implementation = Mockito.mock(HRegionInterface.class); + // Mock an AdminProtocol. + final AdminProtocol implementation = Mockito.mock(AdminProtocol.class); HConnection connection = mockConnection(implementation, null); try { // Now the ct is up... set into the mocks some answers that make it look @@ -396,8 +412,10 @@ public class TestCatalogTracker { // It works for now but has been deprecated. Mockito.when(connection.getRegionServerWithRetries((ServerCallable)Mockito.any())). thenReturn(result); - Mockito.when(implementation.getRegionInfo((byte[]) Mockito.any())). - thenReturn(HRegionInfo.FIRST_META_REGIONINFO); + GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); + builder.setRegionInfo(ProtobufUtil.toRegionInfo(HRegionInfo.FIRST_META_REGIONINFO)); + Mockito.when(implementation.getRegionInfo((RpcController)Mockito.any(), + (GetRegionInfoRequest)Mockito.any())).thenReturn(builder.build()); final CatalogTracker ct = constructAndStartCatalogTracker(connection); ServerName hsa = ct.getMetaLocation(); Assert.assertNull(hsa); @@ -430,7 +448,7 @@ public class TestCatalogTracker { } /** - * @param implementation An {@link HRegionInterface} instance; you'll likely + * @param admin An {@link AdminProtocol} instance; you'll likely * want to pass a mocked HRS; can be null. * @param client A mocked ClientProtocol instance, can be null * @return Mock up a connection that returns a {@link Configuration} when @@ -443,9 +461,8 @@ public class TestCatalogTracker { * when done with this mocked Connection. * @throws IOException */ - private HConnection mockConnection( - final HRegionInterface implementation, final ClientProtocol client) - throws IOException { + private HConnection mockConnection(final AdminProtocol admin, + final ClientProtocol client) throws IOException { HConnection connection = HConnectionTestingUtility.getMockedConnection(UTIL.getConfiguration()); Mockito.doNothing().when(connection).close(); @@ -459,10 +476,10 @@ public class TestCatalogTracker { Mockito.when(connection.locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any())). thenReturn(anyLocation); - if (implementation != null) { + if (admin != null) { // If a call to getHRegionConnection, return this implementation. - Mockito.when(connection.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())). - thenReturn(implementation); + Mockito.when(connection.getAdmin(Mockito.anyString(), Mockito.anyInt())). + thenReturn(admin); } if (client != null) { // If a call to getClient, return this implementation. Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java Mon Apr 23 18:12:16 2012 @@ -27,12 +27,12 @@ import java.util.NavigableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.ClientProtocol; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.util.Bytes; Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java Mon Apr 23 18:12:16 2012 @@ -24,10 +24,11 @@ import org.apache.hadoop.hbase.HRegionIn import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.AdminProtocol; +import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey; import org.apache.hadoop.hbase.ipc.HRegionInterface; -import org.apache.hadoop.hbase.protobuf.ClientProtocol; import org.mockito.Mockito; /** @@ -72,14 +73,14 @@ public class HConnectionTestingUtility { * connection when done by calling * {@link HConnectionManager#deleteConnection(Configuration, boolean)} else it * will stick around; this is probably not what you want. - * @param implementation An {@link HRegionInterface} instance; you'll likely - * want to pass a mocked HRS; can be null. - * + * * @param conf Configuration to use - * @param implementation An HRegionInterface; can be null but is usually + * @param admin An AdminProtocol; can be null but is usually + * itself a mock. + * @param client A ClientProtocol; can be null but is usually * itself a mock. * @param sn ServerName to include in the region location returned by this - * implementation + * connection * @param hri HRegionInfo to include in the location returned when * getRegionLocation is called on the mocked connection * @return Mock up a connection that returns a {@link Configuration} when @@ -93,7 +94,7 @@ public class HConnectionTestingUtility { * @throws IOException */ public static HConnection getMockedConnectionAndDecorate(final Configuration conf, - final HRegionInterface implementation, final ClientProtocol client, + final AdminProtocol admin, final ClientProtocol client, final ServerName sn, final HRegionInfo hri) throws IOException { HConnection c = HConnectionTestingUtility.getMockedConnection(conf); @@ -105,10 +106,10 @@ public class HConnectionTestingUtility { thenReturn(loc); Mockito.when(c.locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any())). thenReturn(loc); - if (implementation != null) { - // If a call to getHRegionConnection, return this implementation. - Mockito.when(c.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())). - thenReturn(implementation); + if (admin != null) { + // If a call to getAdmin, return this implementation. + Mockito.when(c.getAdmin(Mockito.anyString(), Mockito.anyInt())). + thenReturn(admin); } if (client != null) { // If a call to getClient, return this client. Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java Mon Apr 23 18:12:16 2012 @@ -1220,8 +1220,12 @@ public class TestAdmin { if (!regionInfo.isMetaTable()) { if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion1")) { info = regionInfo; - admin.closeRegionWithEncodedRegionName("sample", rs.getServerName() + try { + admin.closeRegionWithEncodedRegionName("sample", rs.getServerName() .getServerName()); + } catch (NotServingRegionException nsre) { + // expected, ignore it + } } } } @@ -1320,8 +1324,12 @@ public class TestAdmin { if (!regionInfo.isMetaTable()) { if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion4")) { info = regionInfo; - admin.closeRegionWithEncodedRegionName(regionInfo + try { + admin.closeRegionWithEncodedRegionName(regionInfo .getRegionNameAsString(), rs.getServerName().getServerName()); + } catch (NotServingRegionException nsre) { + // expected, ignore it. + } } } } Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java Mon Apr 23 18:12:16 2012 @@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; +import java.util.List; import java.util.Random; import org.apache.commons.logging.Log; @@ -32,7 +32,8 @@ import org.apache.hadoop.hbase.HColumnDe import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.client.AdminProtocol; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; @@ -42,17 +43,12 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; - @Category(LargeTests.class) public class TestFromClientSide3 { final Log LOG = LogFactory.getLog(getClass()); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static byte[] ROW = Bytes.toBytes("testRow"); private static byte[] FAMILY = Bytes.toBytes("testFamily"); - private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); - private static byte[] VALUE = Bytes.toBytes("testValue"); private static Random random = new Random(); private static int SLAVES = 3; @@ -108,19 +104,21 @@ public class TestFromClientSide3 { HConnection conn = HConnectionManager.getConnection(TEST_UTIL .getConfiguration()); HRegionLocation loc = table.getRegionLocation(row, true); - HRegionInterface server = conn.getHRegionConnection(loc.getHostname(), loc + AdminProtocol server = conn.getAdmin(loc.getHostname(), loc .getPort()); byte[] regName = loc.getRegionInfo().getRegionName(); for (int i = 0; i < nFlushes; i++) { randomCFPuts(table, row, family, nPuts); - int sfCount = server.getStoreFileList(regName, FAMILY).size(); + List sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY); + int sfCount = sf.size(); // TODO: replace this api with a synchronous flush after HBASE-2949 admin.flush(table.getTableName()); // synchronously poll wait for a new storefile to appear (flush happened) - while (server.getStoreFileList(regName, FAMILY).size() == sfCount) { + while (ProtobufUtil.getStoreFiles( + server, regName, FAMILY).size() == sfCount) { Thread.sleep(40); } } @@ -154,9 +152,10 @@ public class TestFromClientSide3 { // Verify we have multiple store files. HRegionLocation loc = hTable.getRegionLocation(row, true); byte[] regionName = loc.getRegionInfo().getRegionName(); - HRegionInterface server = connection.getHRegionConnection( - loc.getHostname(), loc.getPort()); - assertTrue(server.getStoreFileList(regionName, FAMILY).size() > 1); + AdminProtocol server = connection.getAdmin( + loc.getHostname(), loc.getPort()); + assertTrue(ProtobufUtil.getStoreFiles( + server, regionName, FAMILY).size() > 1); // Issue a compaction request admin.compact(TABLE); @@ -167,16 +166,17 @@ public class TestFromClientSide3 { loc = hTable.getRegionLocation(row, true); if (!loc.getRegionInfo().isOffline()) { regionName = loc.getRegionInfo().getRegionName(); - server = connection.getHRegionConnection(loc.getHostname(), loc - .getPort()); - if (server.getStoreFileList(regionName, FAMILY).size() <= 1) { + server = connection.getAdmin(loc.getHostname(), loc.getPort()); + if (ProtobufUtil.getStoreFiles( + server, regionName, FAMILY).size() <= 1) { break; } } Thread.sleep(40); } // verify the compactions took place and that we didn't just time out - assertTrue(server.getStoreFileList(regionName, FAMILY).size() <= 1); + assertTrue(ProtobufUtil.getStoreFiles( + server, regionName, FAMILY).size() <= 1); // change the compaction.min config option for this table to 5 LOG.info("hbase.hstore.compaction.min should now be 5"); @@ -198,11 +198,11 @@ public class TestFromClientSide3 { // This time, the compaction request should not happen Thread.sleep(10 * 1000); - int sfCount = 0; loc = hTable.getRegionLocation(row, true); regionName = loc.getRegionInfo().getRegionName(); - server = connection.getHRegionConnection(loc.getHostname(), loc.getPort()); - sfCount = server.getStoreFileList(regionName, FAMILY).size(); + server = connection.getAdmin(loc.getHostname(), loc.getPort()); + int sfCount = ProtobufUtil.getStoreFiles( + server, regionName, FAMILY).size(); assertTrue(sfCount > 1); // change an individual CF's config option to 2 & online schema update @@ -225,9 +225,10 @@ public class TestFromClientSide3 { loc = hTable.getRegionLocation(row, true); regionName = loc.getRegionInfo().getRegionName(); try { - server = connection.getHRegionConnection(loc.getHostname(), loc + server = connection.getAdmin(loc.getHostname(), loc .getPort()); - if (server.getStoreFileList(regionName, FAMILY).size() < sfCount) { + if (ProtobufUtil.getStoreFiles( + server, regionName, FAMILY).size() < sfCount) { break; } } catch (Exception e) { @@ -236,7 +237,8 @@ public class TestFromClientSide3 { Thread.sleep(40); } // verify the compaction took place and that we didn't just time out - assertTrue(server.getStoreFileList(regionName, FAMILY).size() < sfCount); + assertTrue(ProtobufUtil.getStoreFiles( + server, regionName, FAMILY).size() < sfCount); // Finally, ensure that we can remove a custom config value after we made it LOG.info("Removing CF config value"); Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java Mon Apr 23 18:12:16 2012 @@ -42,7 +42,6 @@ import org.junit.experimental.categories public class TestHTableUtil { final Log LOG = LogFactory.getLog(getClass()); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static byte [] ROW = Bytes.toBytes("testRow"); private static byte [] FAMILY = Bytes.toBytes("testFamily"); private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); private static byte [] VALUE = Bytes.toBytes("testValue"); Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java?rev=1329358&r1=1329357&r2=1329358&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java Mon Apr 23 18:12:16 2012 @@ -42,13 +42,13 @@ import org.apache.hadoop.hbase.HRegionLo import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ServerCallable; -import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; import org.apache.hadoop.hbase.util.Bytes; @@ -60,6 +60,8 @@ import org.junit.experimental.categories import org.mockito.Mockito; import com.google.common.collect.Multimap; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; /** * Test cases for the atomic load error handling of the bulk load functionality. @@ -259,7 +261,7 @@ public class TestLoadIncrementalHFilesSp } private HConnection getMockedConnection(final Configuration conf) - throws IOException { + throws IOException, ServiceException { HConnection c = Mockito.mock(HConnection.class); Mockito.when(c.getConfiguration()).thenReturn(conf); Mockito.doNothing().when(c).close(); @@ -271,10 +273,10 @@ public class TestLoadIncrementalHFilesSp thenReturn(loc); Mockito.when(c.locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any())). thenReturn(loc); - HRegionInterface hri = Mockito.mock(HRegionInterface.class); - Mockito.when(hri.bulkLoadHFiles(Mockito.anyList(), (byte [])Mockito.any())). - thenThrow(new IOException("injecting bulk load error")); - Mockito.when(c.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())). + ClientProtocol hri = Mockito.mock(ClientProtocol.class); + Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())). + thenThrow(new ServiceException(new IOException("injecting bulk load error"))); + Mockito.when(c.getClient(Mockito.anyString(), Mockito.anyInt())). thenReturn(hri); return c; }