Return-Path: Delivered-To: apmail-hadoop-hbase-commits-archive@minotaur.apache.org Received: (qmail 4770 invoked from network); 12 Feb 2010 05:06:30 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 12 Feb 2010 05:06:30 -0000 Received: (qmail 62324 invoked by uid 500); 12 Feb 2010 05:06:30 -0000 Delivered-To: apmail-hadoop-hbase-commits-archive@hadoop.apache.org Received: (qmail 62261 invoked by uid 500); 12 Feb 2010 05:06:29 -0000 Mailing-List: contact hbase-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hbase-dev@hadoop.apache.org Delivered-To: mailing list hbase-commits@hadoop.apache.org Received: (qmail 62252 invoked by uid 99); 12 Feb 2010 05:06:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Feb 2010 05:06:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Feb 2010 05:06:27 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C70BC23888BD; Fri, 12 Feb 2010 05:06:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r909235 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/ Date: Fri, 12 Feb 2010 05:06:07 -0000 To: hbase-commits@hadoop.apache.org From: rawson@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100212050607.C70BC23888BD@eris.apache.org> Author: rawson Date: Fri Feb 12 05:06:06 2010 New Revision: 909235 URL: http://svn.apache.org/viewvc?rev=909235&view=rev Log: HBASE-2066 Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPut.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java Modified: hadoop/hbase/trunk/CHANGES.txt hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Modified: hadoop/hbase/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=909235&r1=909234&r2=909235&view=diff ============================================================================== --- hadoop/hbase/trunk/CHANGES.txt (original) +++ hadoop/hbase/trunk/CHANGES.txt Fri Feb 12 05:06:06 2010 @@ -358,6 +358,7 @@ HBASE-2209 Support of List [ ] in HBaseOutputWritable for serialization (Kay Kay via Stack) HBASE-2177 Add timestamping to gc logging option + HBASE-2066 Perf: parallelize puts NEW FEATURES HBASE-1961 HBase EC2 scripts Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java?rev=909235&r1=909234&r2=909235&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java Fri Feb 12 05:06:06 2010 @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; @@ -214,4 +215,9 @@ */ public int processBatchOfDeletes(List list, byte[] tableName) throws IOException; -} \ No newline at end of file + + public void processBatchOfPuts(List list, + final byte[] tableName, ExecutorService pool) throws IOException; + + +} Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=909235&r1=909234&r2=909235&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Feb 12 05:06:06 2010 @@ -29,6 +29,10 @@ import java.util.Map; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -260,7 +264,7 @@ private final Object userRegionLock = new Object(); private volatile Configuration conf; - + // Known region HServerAddress.toString() -> HRegionInterface private final Map servers = new ConcurrentHashMap(); @@ -830,37 +834,40 @@ * requirements. */ private void deleteCachedLocation(final byte [] tableName, - final byte [] row) { - SoftValueSortedMap tableLocations = - getTableLocations(tableName); + final byte [] row) { + synchronized (this.cachedRegionLocations) { + SoftValueSortedMap tableLocations = + getTableLocations(tableName); - // start to examine the cache. we can only do cache actions - // if there's something in the cache for this table. - if (!tableLocations.isEmpty()) { - // cut the cache so that we only get the part that could contain - // regions that match our key - SoftValueSortedMap matchingRegions = - tableLocations.headMap(row); - - // if that portion of the map is empty, then we're done. otherwise, - // we need to examine the cached location to verify that it is - // a match by end key as well. - if (!matchingRegions.isEmpty()) { - HRegionLocation possibleRegion = - matchingRegions.get(matchingRegions.lastKey()); - byte [] endKey = possibleRegion.getRegionInfo().getEndKey(); - - // by nature of the map, we know that the start key has to be < - // otherwise it wouldn't be in the headMap. - if (KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length, - row, 0, row.length) <= 0) { - // delete any matching entry - HRegionLocation rl = - tableLocations.remove(matchingRegions.lastKey()); - if (rl != null && LOG.isDebugEnabled()) { - LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString() + - " for tableName=" + Bytes.toString(tableName) + " from cache " + - "because of " + Bytes.toStringBinary(row)); + // start to examine the cache. we can only do cache actions + // if there's something in the cache for this table. + if (!tableLocations.isEmpty()) { + // cut the cache so that we only get the part that could contain + // regions that match our key + SoftValueSortedMap matchingRegions = + tableLocations.headMap(row); + + // if that portion of the map is empty, then we're done. otherwise, + // we need to examine the cached location to verify that it is + // a match by end key as well. + if (!matchingRegions.isEmpty()) { + HRegionLocation possibleRegion = + matchingRegions.get(matchingRegions.lastKey()); + byte [] endKey = possibleRegion.getRegionInfo().getEndKey(); + + // by nature of the map, we know that the start key has to be < + // otherwise it wouldn't be in the headMap. + if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || + KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length, + row, 0, row.length) > 0) { + // delete any matching entry + HRegionLocation rl = + tableLocations.remove(matchingRegions.lastKey()); + if (rl != null && LOG.isDebugEnabled()) { + LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString() + + " for tableName=" + Bytes.toString(tableName) + " from cache " + + "because of " + Bytes.toStringBinary(row)); + } } } } @@ -909,7 +916,7 @@ " is " + location.getServerAddress()); } } - + public HRegionInterface getHRegionConnection( HServerAddress regionServer, boolean getMaster) throws IOException { @@ -1295,5 +1302,127 @@ } } } - } + + public void processBatchOfPuts(List list, + final byte[] tableName, ExecutorService pool) throws IOException { + for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) { + Collections.sort(list); + Map regionPuts = + new HashMap(); + // step 1: + // break up into regionserver-sized chunks and build the data structs + for ( Put put : list ) { + byte [] row = put.getRow(); + + HRegionLocation loc = locateRegion(tableName, row, true); + HServerAddress address = loc.getServerAddress(); + byte [] regionName = loc.getRegionInfo().getRegionName(); + + MultiPut mput = regionPuts.get(address); + if (mput == null) { + mput = new MultiPut(address); + regionPuts.put(address, mput); + } + mput.add(regionName, put); + } + + // step 2: + // make the requests + // Discard the map, just use a list now, makes error recovery easier. + List multiPuts = new ArrayList(regionPuts.values()); + + List> futures = + new ArrayList>(regionPuts.size()); + for ( MultiPut put : multiPuts ) { + futures.add(pool.submit(createPutCallable(put.address, + put, + tableName))); + } + // RUN! + List failed = new ArrayList(); + + // step 3: + // collect the failures and tries from step 1. + for (int i = 0; i < futures.size(); i++ ) { + Future future = futures.get(i); + MultiPut request = multiPuts.get(i); + try { + MultiPutResponse resp = future.get(); + + // For each region + for (Map.Entry> e : request.puts.entrySet()) { + Integer result = resp.getAnswer(e.getKey()); + if (result == null) { + // failed + LOG.debug("Failed all for region: " + + Bytes.toStringBinary(e.getKey()) + ", removing from cache"); + failed.addAll(e.getValue()); + } else if (result >= 0) { + // some failures + List lst = e.getValue(); + failed.addAll(lst.subList(result, lst.size())); + LOG.debug("Failed past " + result + " for region: " + + Bytes.toStringBinary(e.getKey()) + ", removing from cache"); + } + } + } catch (InterruptedException e) { + // go into the failed list. + LOG.debug("Failed all from " + request.address, e); + failed.addAll(request.allPuts()); + } catch (ExecutionException e) { + System.out.println(e); + // all go into the failed list. + LOG.debug("Failed all from " + request.address, e); + failed.addAll(request.allPuts()); + } + } + list.clear(); + if (!failed.isEmpty()) { + for (Put failedPut: failed) { + deleteCachedLocation(tableName, failedPut.getRow()); + } + + list.addAll(failed); + + long sleepTime = getPauseTime(tries); + LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime + + " ms!"); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + + } + } + } + if (!list.isEmpty()) { + // ran out of retries and didnt succeed everything! + throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " + + numRetries + " times. Should have detail on which Regions failed the most"); + } + } + + + private Callable createPutCallable( + final HServerAddress address, final MultiPut puts, + final byte [] tableName) { + final HConnection connection = this; + return new Callable() { + public MultiPutResponse call() throws IOException { + return getRegionServerWithRetries( + new ServerCallable(connection, tableName, null) { + public MultiPutResponse call() throws IOException { + MultiPutResponse resp = server.multiPut(puts); + resp.request = puts; + return resp; + } + @Override + public void instantiateServer(boolean reload) throws IOException { + server = connection.getHRegionConnection(address); + } + } + ); + } + }; + } + } } Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=909235&r1=909234&r2=909235&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Fri Feb 12 05:06:06 2010 @@ -27,6 +27,12 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,9 +71,10 @@ private boolean autoFlush; private long currentWriteBufferSize; protected int scannerCaching; - private long maxScannerResultSize; private int maxKeyValueSize; + private long maxScannerResultSize; + /** * Creates an object to access a HBase table * @@ -102,6 +109,7 @@ this(conf, Bytes.toBytes(tableName)); } + /** * Creates an object to access a HBase table. * @@ -126,12 +134,37 @@ this.autoFlush = true; this.currentWriteBufferSize = 0; this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1); + this.maxScannerResultSize = conf.getLong( HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); + + int nrHRS = getCurrentNrHRS(); + int nrThreads = conf.getInt("hbase.htable.threads.max", nrHRS); + + // Unfortunately Executors.newCachedThreadPool does not allow us to + // set the maximum size of the pool, so we have to do it ourselves. + this.pool = new ThreadPoolExecutor(0, nrThreads, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + new DaemonThreadFactory()); + } + + /** + * TODO Might want to change this to public, would be nice if the number + * of threads would automatically change when servers were added and removed + * @return the number of region servers that are currently running + * @throws IOException + */ + private int getCurrentNrHRS() throws IOException { + HBaseAdmin admin = new HBaseAdmin(this.configuration); + return admin.getClusterStatus().getServers(); } + // For multiput + private ExecutorService pool; + /** * @param tableName name of table to check * @return true if table is on-line @@ -591,11 +624,11 @@ * @throws IOException */ public void flushCommits() throws IOException { - int last = 0; try { - last = connection.processBatchOfRows(writeBuffer, tableName); + connection.processBatchOfPuts(writeBuffer, + tableName, pool); } finally { - writeBuffer.subList(0, last).clear(); + // the write buffer was adjsuted by processBatchOfPuts currentWriteBufferSize = 0; for (int i = 0; i < writeBuffer.size(); i++) { currentWriteBufferSize += writeBuffer.get(i).heapSize(); @@ -716,7 +749,7 @@ public ArrayList getWriteBuffer() { return writeBuffer; } - + /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate @@ -1016,4 +1049,31 @@ }; } } + + static class DaemonThreadFactory implements ThreadFactory { + static final AtomicInteger poolNumber = new AtomicInteger(1); + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + + DaemonThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null)? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = "pool-" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (!t.isDaemon()) + t.setDaemon(true); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + } } Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPut.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPut.java?rev=909235&view=auto ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPut.java (added) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPut.java Fri Feb 12 05:06:06 2010 @@ -0,0 +1,107 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.HServerAddress; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.DataInput; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.Collection; +import java.util.TreeMap; + +public class MultiPut implements Writable { + public HServerAddress address; // client code ONLY + + // map of regions to lists of puts for that region. + public Map > puts = new TreeMap>(Bytes.BYTES_COMPARATOR); + + public MultiPut() {} + + public MultiPut(HServerAddress a) { + address = a; + } + + public int size() { + int size = 0; + for( List l : puts.values()) { + size += l.size(); + } + return size; + } + + public void add(byte[] regionName, Put aPut) { + List rsput = puts.get(regionName); + if (rsput == null) { + rsput = new ArrayList(); + puts.put(regionName, rsput); + } + rsput.add(aPut); + } + + public Collection allPuts() { + List res = new ArrayList(); + for ( List pp : puts.values() ) { + res.addAll(pp); + } + return res; + } + + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(puts.size()); + for( Map.Entry> e : puts.entrySet()) { + Bytes.writeByteArray(out, e.getKey()); + + List ps = e.getValue(); + out.writeInt(ps.size()); + for( Put p : ps ) { + p.write(out); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + puts.clear(); + + int mapSize = in.readInt(); + + for (int i = 0 ; i < mapSize; i++) { + byte[] key = Bytes.readByteArray(in); + + int listSize = in.readInt(); + List ps = new ArrayList(listSize); + for ( int j = 0 ; j < listSize; j++ ) { + Put put = new Put(); + put.readFields(in); + ps.add(put); + } + puts.put(key, ps); + } + } +} Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java?rev=909235&view=auto ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java (added) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java Fri Feb 12 05:06:06 2010 @@ -0,0 +1,71 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.DataInput; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.TreeMap; + +public class MultiPutResponse implements Writable { + + public MultiPut request; // used in client code ONLY + + public Map answers = new TreeMap(Bytes.BYTES_COMPARATOR); + + public MultiPutResponse() {} + + public void addResult(byte[] regionName, int result) { + answers.put(regionName, result); + } + + public Integer getAnswer(byte[] region) { + return answers.get(region); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(answers.size()); + for( Map.Entry e : answers.entrySet()) { + Bytes.writeByteArray(out, e.getKey()); + out.writeInt(e.getValue()); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + answers.clear(); + + int mapSize = in.readInt(); + for( int i = 0 ; i < mapSize ; i++ ) { + byte[] key = Bytes.readByteArray(in); + int value = in.readInt(); + + answers.put(key, value); + } + } +} Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=909235&r1=909234&r2=909235&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Fri Feb 12 05:06:06 2010 @@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.io.MapWritable; @@ -157,6 +159,10 @@ addToMap(FirstKeyOnlyFilter.class, code++); addToMap(Delete [].class, code++); + + addToMap(MultiPut.class, code++); + addToMap(MultiPutResponse.class, code++); + addToMap(HLog.Entry.class, code++); addToMap(HLog.Entry[].class, code++); addToMap(HLogKey.class, code++); Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java?rev=909235&r1=909234&r2=909235&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java Fri Feb 12 05:06:06 2010 @@ -74,7 +74,8 @@ *
  • Version 20: Backed Transaction HBase out of HBase core.
  • *
  • Version 21: HBASE-1665.
  • *
  • Version 22: HBASE-2209. Added List support to RPC
  • + *
  • Version 23: HBASE-2066, multi-put.
  • * */ - public static final long versionID = 22L; + public static final long versionID = 23L; } Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=909235&r1=909234&r2=909235&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Fri Feb 12 05:06:06 2010 @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.regionserver.HRegion; /** @@ -245,4 +247,15 @@ * @throws IOException */ public HServerInfo getHServerInfo() throws IOException; + + + /** + * Multi put for putting multiple regions worth of puts at once. + * + * @param puts the request + * @return the reply + * @throws IOException + */ + public MultiPutResponse multiPut(MultiPut puts) throws IOException; + } Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=909235&r1=909234&r2=909235&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Feb 12 05:06:06 2010 @@ -82,6 +82,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ServerConnection; import org.apache.hadoop.hbase.client.ServerConnectionManager; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; @@ -1701,17 +1703,17 @@ if (!region.getRegionInfo().isMetaTable()) { this.cacheFlusher.reclaimMemStoreMemory(); } - Integer[] locks = new Integer[puts.length]; for (i = 0; i < puts.length; i++) { this.requestCount.incrementAndGet(); - locks[i] = getLockFromId(puts[i].getLockId()); - region.put(puts[i], locks[i]); + Integer lock = getLockFromId(puts[i].getLockId()); + region.put(puts[i], lock); } } catch (WrongRegionException ex) { LOG.debug("Batch puts: " + i, ex); return i; } catch (NotServingRegionException ex) { + LOG.debug("Batch puts: " + i, ex); return i; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); @@ -2427,4 +2429,20 @@ doMain(args, regionServerClass); } + + @Override + public MultiPutResponse multiPut(MultiPut puts) throws IOException { + MultiPutResponse resp = new MultiPutResponse(); + + // do each region as it's own. + for( Map.Entry> e: puts.puts.entrySet()) { + int result = put(e.getKey(), e.getValue().toArray(new Put[]{})); + resp.addResult(e.getKey(), result); + + e.getValue().clear(); // clear some RAM + } + + return resp; + } + } Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java?rev=909235&r1=909234&r2=909235&view=diff ============================================================================== --- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (original) +++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java Fri Feb 12 05:06:06 2010 @@ -82,6 +82,8 @@ } /** + * Subclass hook. + * * Run after dfs is ready but before hbase cluster is started up. */ protected void preHBaseClusterSetup() throws Exception { Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java?rev=909235&r1=909234&r2=909235&view=diff ============================================================================== --- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (original) +++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Fri Feb 12 05:06:06 2010 @@ -28,7 +28,7 @@ * Utility class to build a table of multiple regions. */ public class MultiRegionTable extends HBaseClusterTestCase { - private static final byte [][] KEYS = { + protected static final byte [][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), @@ -63,8 +63,13 @@ * @param familyName the family to populate. */ public MultiRegionTable(final String familyName) { - super(); - this.columnFamily = Bytes.toBytes(familyName); + this(1, familyName); + } + + public MultiRegionTable(int nServers, final String familyName) { + super(nServers); + + this.columnFamily = Bytes.toBytes(familyName); // These are needed for the new and improved Map/Reduce framework System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir")); conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir")); Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java?rev=909235&view=auto ============================================================================== --- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java (added) +++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java Fri Feb 12 05:06:06 2010 @@ -0,0 +1,97 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.List; +import java.util.ArrayList; + +public class TestMultiParallelPut extends MultiRegionTable { + private static final byte[] VALUE = Bytes.toBytes("value"); + private static final byte[] QUALIFIER = Bytes.toBytes("qual"); + private static final String FAMILY = "family"; + private static final String TEST_TABLE = "test_table"; + private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); + + + public TestMultiParallelPut() { + super(2, FAMILY); + desc = new HTableDescriptor(TEST_TABLE); + desc.addFamily(new HColumnDescriptor(FAMILY)); + + makeKeys(); + } + + private void makeKeys() { + for (byte [] k : KEYS) { + byte [] cp = new byte[k.length+1]; + System.arraycopy(k, 0, cp, 0, k.length); + cp[k.length] = 1; + + keys.add(cp); + } + } + + List keys = new ArrayList(); + + public void testMultiPut() throws Exception { + + HTable table = new HTable(TEST_TABLE); + table.setAutoFlush(false); + table.setWriteBufferSize(10 * 1024 * 1024); + + for ( byte [] k : keys ) { + Put put = new Put(k); + put.add(BYTES_FAMILY, QUALIFIER, VALUE); + + table.put(put); + } + + table.flushCommits(); + + for (byte [] k : keys ) { + Get get = new Get(k); + get.addColumn(BYTES_FAMILY, QUALIFIER); + + Result r = table.get(get); + + assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); + assertEquals(0, + Bytes.compareTo(VALUE, + r.getValue(BYTES_FAMILY, QUALIFIER))); + } + + HBaseAdmin admin = new HBaseAdmin(conf); + ClusterStatus cs = admin.getClusterStatus(); + + assertEquals(2, cs.getServers()); + for ( HServerInfo info : cs.getServerInfo()) { + System.out.println(info); + assertTrue( info.getLoad().getNumberOfRegions() > 10); + } + } +}