Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B28289ED9 for ; Thu, 20 Oct 2011 00:43:23 +0000 (UTC) Received: (qmail 79267 invoked by uid 500); 20 Oct 2011 00:43:23 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 79228 invoked by uid 500); 20 Oct 2011 00:43:23 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 79221 invoked by uid 99); 20 Oct 2011 00:43:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2011 00:43:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2011 00:43:19 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0EC7A2388900 for ; Thu, 20 Oct 2011 00:42:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1186586 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/thrift/ Date: Thu, 20 Oct 2011 00:42:57 -0000 To: commits@hbase.apache.org From: jgray@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111020004258.0EC7A2388900@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jgray Date: Thu Oct 20 00:42:57 2011 New Revision: 1186586 URL: http://svn.apache.org/viewvc?rev=1186586&view=rev Log: HBASE-4460 Support running an embedded ThriftServer within a RegionServer (jgray) Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Modified: hbase/trunk/CHANGES.txt hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Modified: hbase/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1186586&r1=1186585&r2=1186586&view=diff ============================================================================== --- hbase/trunk/CHANGES.txt (original) +++ hbase/trunk/CHANGES.txt Thu Oct 20 00:42:57 2011 @@ -1,5 +1,8 @@ HBase Change Log Release 0.93.0 - Unreleased + NEW FEATURE + HBASE-4460 Support running an embedded ThriftServer within a RegionServer (jgray) + IMPROVEMENT HBASE-4132 Extend the WALActionsListener API to accomodate log archival (dhruba borthakur) Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1186586&r1=1186585&r2=1186586&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Oct 20 00:42:57 2011 @@ -300,6 +300,9 @@ public class HRegionServer implements HR // Cache configuration and block cache reference private final CacheConfig cacheConfig; + // reference to the Thrift Server. + volatile private HRegionThriftServer thriftServer; + /** * The server name the Master sees us as. Its made from the hostname the * master passes us, port, and server startcode. Gets set after registration @@ -617,6 +620,13 @@ public class HRegionServer implements HR HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD), this.threadWakeFrequency); + + // Create the thread for the ThriftServer. + if (conf.getBoolean("hbase.regionserver.export.thrift", false)) { + thriftServer = new HRegionThriftServer(this, conf); + thriftServer.start(); + LOG.info("Started Thrift API from Region Server."); + } } /** @@ -685,6 +695,7 @@ public class HRegionServer implements HR } } // Run shutdown. + if (this.thriftServer != null) this.thriftServer.shutdown(); this.leases.closeAfterLeasesExpire(); this.rpcServer.stop(); if (this.splitLogWorker != null) { Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1186586&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Thu Oct 20 00:42:57 2011 @@ -0,0 +1,229 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.thrift.ThriftServer; +import org.apache.hadoop.hbase.thrift.ThriftUtilities; +import org.apache.hadoop.hbase.thrift.generated.Hbase; +import org.apache.hadoop.hbase.thrift.generated.IOError; +import org.apache.hadoop.hbase.thrift.generated.TRowResult; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.TNonblockingServer; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransportFactory; + +/** + * HRegionThriftServer - this class starts up a Thrift server in the same + * JVM where the RegionServer is running. It inherits most of the + * functionality from the standard ThriftServer. This is good because + * we can maintain compatibility with applications that use the + * standard Thrift interface. For performance reasons, we can override + * methods to directly invoke calls into the HRegionServer and avoid the hop. + *

+ * This can be enabled with hbase.regionserver.export.thrift set to true. + */ +public class HRegionThriftServer extends Thread { + + public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class); + public static final int DEFAULT_LISTEN_PORT = 9090; + + private HRegionServer rs; + private Configuration conf; + + private int port; + private boolean nonblocking; + private String bindIpAddress; + private String transport; + private String protocol; + volatile private TServer tserver; + + /** + * Create an instance of the glue object that connects the + * RegionServer with the standard ThriftServer implementation + */ + HRegionThriftServer(HRegionServer regionServer, Configuration conf) { + this.rs = regionServer; + this.conf = conf; + } + + /** + * Inherit the Handler from the standard ThriftServer. This allows us + * to use the default implementation for most calls. We override certain calls + * for performance reasons + */ + private class HBaseHandlerRegion extends ThriftServer.HBaseHandler { + + HBaseHandlerRegion(final Configuration conf) throws IOException { + super(conf); + initialize(conf); + } + + // TODO: Override more methods to short-circuit for performance + + /** + * Get a record. Short-circuit to get better performance. + */ + @Override + public List getRowWithColumnsTs(ByteBuffer tableName, + ByteBuffer rowb, + List columns, + long timestamp) + throws IOError { + try { + byte [] row = rowb.array(); + HTable table = getTable(tableName.array()); + HRegionLocation location = table.getRegionLocation(row); + byte[] regionName = location.getRegionInfo().getEncodedNameAsBytes(); + + if (columns == null) { + Get get = new Get(row); + get.setTimeRange(Long.MIN_VALUE, timestamp); + Result result = rs.get(regionName, get); + return ThriftUtilities.rowResultFromHBase(result); + } + ByteBuffer[] columnArr = columns.toArray( + new ByteBuffer[columns.size()]); + Get get = new Get(row); + for(ByteBuffer column : columnArr) { + byte [][] famAndQf = KeyValue.parseColumn(column.array()); + if (famAndQf.length == 1) { + get.addFamily(famAndQf[0]); + } else { + get.addColumn(famAndQf[0], famAndQf[1]); + } + } + get.setTimeRange(Long.MIN_VALUE, timestamp); + Result result = rs.get(regionName, get); + return ThriftUtilities.rowResultFromHBase(result); + } catch (NotServingRegionException e) { + LOG.info("ThriftServer redirecting getRowWithColumnsTs"); + return super.getRowWithColumnsTs(tableName, rowb, columns, timestamp); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + } + + /** + * Read and initialize config parameters + */ + private void initialize(Configuration conf) { + this.port = conf.getInt("hbase.regionserver.thrift.port", + DEFAULT_LISTEN_PORT); + this.bindIpAddress = conf.get("hbase.regionserver.thrift.ipaddress"); + this.protocol = conf.get("hbase.regionserver.thrift.protocol"); + this.transport = conf.get("hbase.regionserver.thrift.transport"); + this.nonblocking = conf.getBoolean("hbase.regionserver.thrift.nonblocking", + false); + } + + /** + * Stop ThriftServer + */ + void shutdown() { + if (tserver != null) { + tserver.stop(); + tserver = null; + } + } + + @Override + public void run() { + try { + HBaseHandlerRegion handler = new HBaseHandlerRegion(this.conf); + Hbase.Processor processor = new Hbase.Processor(handler); + + TProtocolFactory protocolFactory; + if (this.protocol != null && this.protocol.equals("compact")) { + protocolFactory = new TCompactProtocol.Factory(); + } else { + protocolFactory = new TBinaryProtocol.Factory(); + } + + if (this.nonblocking) { + TNonblockingServerTransport serverTransport = + new TNonblockingServerSocket(this.port); + TFramedTransport.Factory transportFactory = + new TFramedTransport.Factory(); + + TNonblockingServer.Args serverArgs = + new TNonblockingServer.Args(serverTransport); + serverArgs.processor(processor); + serverArgs.transportFactory(transportFactory); + serverArgs.protocolFactory(protocolFactory); + LOG.info("starting HRegionServer Nonblocking Thrift server on " + + this.port); + LOG.info("HRegionServer Nonblocking Thrift server does not " + + "support address binding."); + tserver = new TNonblockingServer(serverArgs); + } else { + InetAddress listenAddress = null; + if (this.bindIpAddress != null) { + listenAddress = InetAddress.getByName(this.bindIpAddress); + } else { + listenAddress = InetAddress.getLocalHost(); + } + TServerTransport serverTransport = new TServerSocket( + new InetSocketAddress(listenAddress, port)); + + TTransportFactory transportFactory; + if (this.transport != null && this.transport.equals("framed")) { + transportFactory = new TFramedTransport.Factory(); + } else { + transportFactory = new TTransportFactory(); + } + + TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport); + serverArgs.processor(processor); + serverArgs.protocolFactory(protocolFactory); + serverArgs.transportFactory(transportFactory); + LOG.info("starting HRegionServer ThreadPool Thrift server on " + + listenAddress + ":" + this.port); + tserver = new TThreadPoolServer(serverArgs); + } + tserver.serve(); + } catch (Exception e) { + LOG.warn("Unable to start HRegionServerThrift interface.", e); + } + } +} Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1186586&r1=1186585&r2=1186586&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Thu Oct 20 00:42:57 2011 @@ -195,12 +195,12 @@ public class ThriftServer { * Constructs an HBaseHandler object. * @throws IOException */ - HBaseHandler() + protected HBaseHandler() throws IOException { this(HBaseConfiguration.create()); } - HBaseHandler(final Configuration c) + protected HBaseHandler(final Configuration c) throws IOException { this.conf = c; admin = new HBaseAdmin(conf);