hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raw...@apache.org
Subject svn commit: r909235 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/
Date Fri, 12 Feb 2010 05:06:07 GMT
Author: rawson
Date: Fri Feb 12 05:06:06 2010
New Revision: 909235

URL: http://svn.apache.org/viewvc?rev=909235&view=rev
Log:
HBASE-2066 


Added:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPut.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=909235&r1=909234&r2=909235&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Feb 12 05:06:06 2010
@@ -358,6 +358,7 @@
    HBASE-2209  Support of List [ ] in HBaseOutputWritable for serialization
                (Kay Kay via Stack)
    HBASE-2177  Add timestamping to gc logging option
+   HBASE-2066  Perf: parallelize puts
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java?rev=909235&r1=909234&r2=909235&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java Fri Feb 12
05:06:06 2010
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HServerAddress;
@@ -214,4 +215,9 @@
    */
   public int processBatchOfDeletes(List<Delete> list, byte[] tableName)
   throws IOException;
-}
\ No newline at end of file
+  
+  public void processBatchOfPuts(List<Put> list,
+                                 final byte[] tableName, ExecutorService pool) throws IOException;
+
+  
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=909235&r1=909234&r2=909235&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri
Feb 12 05:06:06 2010
@@ -29,6 +29,10 @@
 import java.util.Map;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -260,7 +264,7 @@
     private final Object userRegionLock = new Object();
         
     private volatile Configuration conf;
-    
+
     // Known region HServerAddress.toString() -> HRegionInterface 
     private final Map<String, HRegionInterface> servers =
       new ConcurrentHashMap<String, HRegionInterface>();
@@ -830,37 +834,40 @@
      * requirements.
      */
     private void deleteCachedLocation(final byte [] tableName,
-        final byte [] row) {
-      SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
-        getTableLocations(tableName);
+                                      final byte [] row) {
+      synchronized (this.cachedRegionLocations) {
+        SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
+            getTableLocations(tableName);
 
-      // start to examine the cache. we can only do cache actions
-      // if there's something in the cache for this table.
-      if (!tableLocations.isEmpty()) {
-        // cut the cache so that we only get the part that could contain
-        // regions that match our key
-        SoftValueSortedMap<byte [], HRegionLocation> matchingRegions =
-          tableLocations.headMap(row);
-
-        // if that portion of the map is empty, then we're done. otherwise,
-        // we need to examine the cached location to verify that it is 
-        // a match by end key as well.
-        if (!matchingRegions.isEmpty()) {
-          HRegionLocation possibleRegion =
-            matchingRegions.get(matchingRegions.lastKey());
-          byte [] endKey = possibleRegion.getRegionInfo().getEndKey();
-
-          // by nature of the map, we know that the start key has to be < 
-          // otherwise it wouldn't be in the headMap. 
-          if (KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length,
-              row, 0, row.length) <= 0) {
-            // delete any matching entry
-            HRegionLocation rl =
-              tableLocations.remove(matchingRegions.lastKey());
-            if (rl != null && LOG.isDebugEnabled()) {
-              LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString() +
-                " for tableName=" + Bytes.toString(tableName) + " from cache " +
-                "because of " + Bytes.toStringBinary(row));
+        // start to examine the cache. we can only do cache actions
+        // if there's something in the cache for this table.
+        if (!tableLocations.isEmpty()) {
+          // cut the cache so that we only get the part that could contain
+          // regions that match our key
+          SoftValueSortedMap<byte [], HRegionLocation> matchingRegions =
+              tableLocations.headMap(row);
+
+          // if that portion of the map is empty, then we're done. otherwise,
+          // we need to examine the cached location to verify that it is
+          // a match by end key as well.
+          if (!matchingRegions.isEmpty()) {
+            HRegionLocation possibleRegion =
+                matchingRegions.get(matchingRegions.lastKey());
+            byte [] endKey = possibleRegion.getRegionInfo().getEndKey();
+
+            // by nature of the map, we know that the start key has to be <
+            // otherwise it wouldn't be in the headMap.
+            if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
+                KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length,
+                    row, 0, row.length) > 0) {
+              // delete any matching entry
+              HRegionLocation rl =
+                  tableLocations.remove(matchingRegions.lastKey());
+              if (rl != null && LOG.isDebugEnabled()) {
+                LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString() +
+                    " for tableName=" + Bytes.toString(tableName) + " from cache " +
+                    "because of " + Bytes.toStringBinary(row));
+              }
             }
           }
         }
@@ -909,7 +916,7 @@
             " is " + location.getServerAddress());
       }
     }
-    
+
     public HRegionInterface getHRegionConnection(
         HServerAddress regionServer, boolean getMaster) 
     throws IOException {
@@ -1295,5 +1302,127 @@
         }
       }
     }
-  } 
+
+    public void processBatchOfPuts(List<Put> list,
+                                   final byte[] tableName, ExecutorService pool) throws IOException
{
+      for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
+        Collections.sort(list);
+        Map<HServerAddress, MultiPut> regionPuts =
+            new HashMap<HServerAddress, MultiPut>();
+        // step 1:
+        //  break up into regionserver-sized chunks and build the data structs
+        for ( Put put : list ) {
+          byte [] row = put.getRow();
+
+          HRegionLocation loc = locateRegion(tableName, row, true);
+          HServerAddress address = loc.getServerAddress();
+          byte [] regionName = loc.getRegionInfo().getRegionName();
+
+          MultiPut mput = regionPuts.get(address);
+          if (mput == null) {
+            mput = new MultiPut(address);
+            regionPuts.put(address, mput);
+          }
+          mput.add(regionName, put);
+        }
+
+        // step 2:
+        //  make the requests
+        // Discard the map, just use a list now, makes error recovery easier.
+        List<MultiPut> multiPuts = new ArrayList<MultiPut>(regionPuts.values());
+
+        List<Future<MultiPutResponse>> futures =
+            new ArrayList<Future<MultiPutResponse>>(regionPuts.size());
+        for ( MultiPut put : multiPuts ) {
+          futures.add(pool.submit(createPutCallable(put.address,
+              put,
+              tableName)));
+        }
+        // RUN!
+        List<Put> failed = new ArrayList<Put>();
+
+        // step 3:
+        //  collect the failures and tries from step 1.
+        for (int i = 0; i < futures.size(); i++ ) {
+          Future<MultiPutResponse> future = futures.get(i);
+          MultiPut request = multiPuts.get(i);
+          try {
+            MultiPutResponse resp = future.get();
+
+            // For each region
+            for (Map.Entry<byte[], List<Put>> e : request.puts.entrySet()) {
+              Integer result = resp.getAnswer(e.getKey());
+              if (result == null) {
+                // failed
+                LOG.debug("Failed all for region: " +
+                    Bytes.toStringBinary(e.getKey()) + ", removing from cache");
+                failed.addAll(e.getValue());
+              } else if (result >= 0) {
+                // some failures
+                List<Put> lst = e.getValue();
+                failed.addAll(lst.subList(result, lst.size()));
+                LOG.debug("Failed past " + result + " for region: " +
+                    Bytes.toStringBinary(e.getKey()) + ", removing from cache");
+              }
+            }
+          } catch (InterruptedException e) {
+            // go into the failed list.
+            LOG.debug("Failed all from " + request.address, e);
+            failed.addAll(request.allPuts());
+          } catch (ExecutionException e) {
+            System.out.println(e);
+            // all go into the failed list.
+            LOG.debug("Failed all from " + request.address, e);
+            failed.addAll(request.allPuts());
+          }
+        }
+        list.clear();
+        if (!failed.isEmpty()) {
+          for (Put failedPut: failed) {
+            deleteCachedLocation(tableName, failedPut.getRow());
+          }
+
+          list.addAll(failed);
+
+          long sleepTime = getPauseTime(tries);
+          LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime +
+              " ms!");
+          try {
+            Thread.sleep(sleepTime);
+          } catch (InterruptedException e) {
+
+          }
+        }
+      }
+      if (!list.isEmpty()) {
+        // ran out of retries and didnt succeed everything!
+        throw new RetriesExhaustedException("Still had " + list.size() + " puts left after
retrying " +
+            numRetries + " times. Should have detail on which Regions failed the most");
+      }
+    }
+
+
+    private Callable<MultiPutResponse> createPutCallable(
+        final HServerAddress address, final MultiPut puts,
+        final byte [] tableName) {
+      final HConnection connection = this;
+      return new Callable<MultiPutResponse>() {
+        public MultiPutResponse call() throws IOException {
+          return getRegionServerWithRetries(
+              new ServerCallable<MultiPutResponse>(connection, tableName, null) {
+                public MultiPutResponse call() throws IOException {
+                  MultiPutResponse resp = server.multiPut(puts);
+                  resp.request = puts;
+                  return resp;
+                }
+                @Override
+                public void instantiateServer(boolean reload) throws IOException {
+                  server = connection.getHRegionConnection(address);
+                }
+              }
+          );
+        }
+      };
+    }
+  }
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=909235&r1=909234&r2=909235&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Fri Feb 12 05:06:06
2010
@@ -27,6 +27,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -65,9 +71,10 @@
   private boolean autoFlush;
   private long currentWriteBufferSize;
   protected int scannerCaching;
-  private long maxScannerResultSize;
   private int maxKeyValueSize;
 
+  private long maxScannerResultSize;
+  
   /**
    * Creates an object to access a HBase table
    *
@@ -102,6 +109,7 @@
     this(conf, Bytes.toBytes(tableName));
   }
 
+
   /**
    * Creates an object to access a HBase table.
    *
@@ -126,12 +134,37 @@
     this.autoFlush = true;
     this.currentWriteBufferSize = 0;
     this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1);
+    
     this.maxScannerResultSize = conf.getLong(
       HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 
       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
     this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
+
+    int nrHRS = getCurrentNrHRS();
+    int nrThreads = conf.getInt("hbase.htable.threads.max", nrHRS);
+
+    // Unfortunately Executors.newCachedThreadPool does not allow us to
+    // set the maximum size of the pool, so we have to do it ourselves.
+    this.pool = new ThreadPoolExecutor(0, nrThreads,
+        60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(),
+        new DaemonThreadFactory());
+  }
+
+  /**
+   * TODO Might want to change this to public, would be nice if the number
+   * of threads would automatically change when servers were added and removed
+   * @return the number of region servers that are currently running
+   * @throws IOException
+   */
+  private int getCurrentNrHRS() throws IOException {
+    HBaseAdmin admin = new HBaseAdmin(this.configuration);
+    return admin.getClusterStatus().getServers();
   }
 
+  // For multiput
+  private ExecutorService pool;
+
   /**
    * @param tableName name of table to check
    * @return true if table is on-line
@@ -591,11 +624,11 @@
    * @throws IOException
    */
   public void flushCommits() throws IOException {
-    int last = 0;
     try {
-      last = connection.processBatchOfRows(writeBuffer, tableName);
+      connection.processBatchOfPuts(writeBuffer,
+          tableName, pool);
     } finally {
-      writeBuffer.subList(0, last).clear();
+      // the write buffer was adjsuted by processBatchOfPuts
       currentWriteBufferSize = 0;
       for (int i = 0; i < writeBuffer.size(); i++) {
         currentWriteBufferSize += writeBuffer.get(i).heapSize();
@@ -716,7 +749,7 @@
   public ArrayList<Put> getWriteBuffer() {
     return writeBuffer;
   }
-
+  
   /**
    * Implements the scanner interface for the HBase client.
    * If there are multiple regions in a table, this scanner will iterate
@@ -1016,4 +1049,31 @@
       };
     }
   }
+
+  static class DaemonThreadFactory implements ThreadFactory {
+    static final AtomicInteger poolNumber = new AtomicInteger(1);
+        final ThreadGroup group;
+        final AtomicInteger threadNumber = new AtomicInteger(1);
+        final String namePrefix;
+
+        DaemonThreadFactory() {
+            SecurityManager s = System.getSecurityManager();
+            group = (s != null)? s.getThreadGroup() :
+                                 Thread.currentThread().getThreadGroup();
+            namePrefix = "pool-" +
+                          poolNumber.getAndIncrement() +
+                         "-thread-";
+        }
+
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(group, r,
+                                  namePrefix + threadNumber.getAndIncrement(),
+                                  0);
+            if (!t.isDaemon())
+                t.setDaemon(true);
+            if (t.getPriority() != Thread.NORM_PRIORITY)
+                t.setPriority(Thread.NORM_PRIORITY);
+            return t;
+        }
+  }
 }

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPut.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPut.java?rev=909235&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPut.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPut.java Fri Feb 12 05:06:06
2010
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.HServerAddress;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.DataInput;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.TreeMap;
+
+public class MultiPut implements Writable {
+  public HServerAddress address; // client code ONLY
+
+  // map of regions to lists of puts for that region.
+  public Map<byte[], List<Put> > puts = new TreeMap<byte[], List<Put>>(Bytes.BYTES_COMPARATOR);
+
+  public MultiPut() {}
+
+  public MultiPut(HServerAddress a) {
+    address = a;
+  }
+
+  public int size() {
+    int size = 0;
+    for( List<Put> l : puts.values()) {
+      size += l.size();
+    }
+    return size;
+  }
+  
+  public void add(byte[] regionName, Put aPut) {
+    List<Put> rsput = puts.get(regionName);
+    if (rsput == null) {
+      rsput = new ArrayList<Put>();
+      puts.put(regionName, rsput);
+    }
+    rsput.add(aPut);
+  }
+
+  public Collection<Put> allPuts() {
+    List<Put> res = new ArrayList<Put>();
+    for ( List<Put> pp : puts.values() ) {
+      res.addAll(pp);
+    }
+    return res;
+  }
+
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(puts.size());
+    for( Map.Entry<byte[],List<Put>> e : puts.entrySet()) {
+      Bytes.writeByteArray(out, e.getKey());
+
+      List<Put> ps = e.getValue();
+      out.writeInt(ps.size());
+      for( Put p : ps ) {
+        p.write(out);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    puts.clear();
+
+    int mapSize = in.readInt();
+
+    for (int i = 0 ; i < mapSize; i++) {
+      byte[] key = Bytes.readByteArray(in);
+
+      int listSize = in.readInt();
+      List<Put> ps = new ArrayList<Put>(listSize);
+      for ( int j = 0 ; j < listSize; j++ ) {
+        Put put = new Put();
+        put.readFields(in);
+        ps.add(put);
+      }
+      puts.put(key, ps);
+    }
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java?rev=909235&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java Fri Feb
12 05:06:06 2010
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.DataInput;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.TreeMap;
+
+public class MultiPutResponse implements Writable {
+
+  public MultiPut request; // used in client code ONLY
+
+  public Map<byte[], Integer> answers = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+
+  public MultiPutResponse() {}
+
+  public void addResult(byte[] regionName, int result) {
+    answers.put(regionName, result);
+  }
+
+  public Integer getAnswer(byte[] region) {
+    return answers.get(region);
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(answers.size());
+    for( Map.Entry<byte[],Integer> e : answers.entrySet()) {
+      Bytes.writeByteArray(out, e.getKey());
+      out.writeInt(e.getValue());
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    answers.clear();
+
+    int mapSize = in.readInt();
+    for( int i = 0 ; i < mapSize ; i++ ) {
+      byte[] key = Bytes.readByteArray(in);
+      int value = in.readInt();
+
+      answers.put(key, value);
+    }
+  }
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=909235&r1=909234&r2=909235&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Fri Feb
12 05:06:06 2010
@@ -48,6 +48,8 @@
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.MultiPutResponse;
+import org.apache.hadoop.hbase.client.MultiPut;
 import org.apache.hadoop.hbase.filter.*;
 import org.apache.hadoop.hbase.io.HbaseMapWritable;
 import org.apache.hadoop.io.MapWritable;
@@ -157,6 +159,10 @@
     addToMap(FirstKeyOnlyFilter.class, code++);
 
     addToMap(Delete [].class, code++);
+
+    addToMap(MultiPut.class, code++);
+    addToMap(MultiPutResponse.class, code++);
+
     addToMap(HLog.Entry.class, code++);
     addToMap(HLog.Entry[].class, code++);
     addToMap(HLogKey.class, code++);

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java?rev=909235&r1=909234&r2=909235&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java Fri
Feb 12 05:06:06 2010
@@ -74,7 +74,8 @@
    * <li>Version 20: Backed Transaction HBase out of HBase core.</li>
    * <li>Version 21: HBASE-1665.</li>
    * <li>Version 22: HBASE-2209. Added List support to RPC</li>
+   * <li>Version 23: HBASE-2066, multi-put.</li>
    * </ul>
    */
-  public static final long versionID = 22L;
+  public static final long versionID = 23L;
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=909235&r1=909234&r2=909235&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Fri Feb
12 05:06:06 2010
@@ -29,6 +29,8 @@
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.MultiPutResponse;
+import org.apache.hadoop.hbase.client.MultiPut;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 
 /**
@@ -245,4 +247,15 @@
    * @throws IOException
    */
   public HServerInfo getHServerInfo() throws IOException;
+
+
+  /**
+   * Multi put for putting multiple regions worth of puts at once.
+   *
+   * @param puts the request
+   * @return the reply
+   * @throws IOException
+   */
+  public MultiPutResponse multiPut(MultiPut puts) throws IOException;
+
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=909235&r1=909234&r2=909235&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri
Feb 12 05:06:06 2010
@@ -82,6 +82,8 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
+import org.apache.hadoop.hbase.client.MultiPutResponse;
+import org.apache.hadoop.hbase.client.MultiPut;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -1701,17 +1703,17 @@
       if (!region.getRegionInfo().isMetaTable()) {
         this.cacheFlusher.reclaimMemStoreMemory();
       }
-      Integer[] locks = new Integer[puts.length];
       for (i = 0; i < puts.length; i++) {
         this.requestCount.incrementAndGet();
-        locks[i] = getLockFromId(puts[i].getLockId());
-        region.put(puts[i], locks[i]);
+        Integer lock = getLockFromId(puts[i].getLockId());
+        region.put(puts[i], lock);
       }
 
     } catch (WrongRegionException ex) {
       LOG.debug("Batch puts: " + i, ex);
       return i;
     } catch (NotServingRegionException ex) {
+      LOG.debug("Batch puts: " + i, ex);
       return i;
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
@@ -2427,4 +2429,20 @@
     doMain(args, regionServerClass);
   }
 
+
+  @Override
+  public MultiPutResponse multiPut(MultiPut puts) throws IOException {
+    MultiPutResponse resp = new MultiPutResponse();
+
+    // do each region as it's own.
+    for( Map.Entry<byte[],List<Put>> e: puts.puts.entrySet()) {
+      int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
+      resp.addResult(e.getKey(), result);
+
+      e.getValue().clear(); // clear some RAM
+    }
+
+    return resp;
+  }
+
 }

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java?rev=909235&r1=909234&r2=909235&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java Fri Feb
12 05:06:06 2010
@@ -82,6 +82,8 @@
   }
 
   /**
+   * Subclass hook.
+   * 
    * Run after dfs is ready but before hbase cluster is started up.
    */
   protected void preHBaseClusterSetup() throws Exception {

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java?rev=909235&r1=909234&r2=909235&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Fri Feb 12 05:06:06
2010
@@ -28,7 +28,7 @@
  * Utility class to build a table of multiple regions.
  */
 public class MultiRegionTable extends HBaseClusterTestCase {
-  private static final byte [][] KEYS = {
+  protected static final byte [][] KEYS = {
     HConstants.EMPTY_BYTE_ARRAY,
     Bytes.toBytes("bbb"),
     Bytes.toBytes("ccc"),
@@ -63,8 +63,13 @@
    * @param familyName the family to populate.
    */
   public MultiRegionTable(final String familyName) {
-    super();
-    this.columnFamily = Bytes.toBytes(familyName);
+    this(1, familyName);
+  }
+
+  public MultiRegionTable(int nServers, final String familyName) {
+    super(nServers);
+    
+     this.columnFamily = Bytes.toBytes(familyName);
     // These are needed for the new and improved Map/Reduce framework
     System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir"));
     conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir"));

Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java?rev=909235&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java Fri Feb
12 05:06:06 2010
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.List;
+import java.util.ArrayList;
+
+public class TestMultiParallelPut extends MultiRegionTable {
+  private static final byte[] VALUE = Bytes.toBytes("value");
+  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
+  private static final String FAMILY = "family";
+  private static final String TEST_TABLE = "test_table";
+  private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
+
+
+  public TestMultiParallelPut() {
+    super(2, FAMILY);
+    desc = new HTableDescriptor(TEST_TABLE);
+    desc.addFamily(new HColumnDescriptor(FAMILY));
+
+    makeKeys();
+  }
+
+  private void makeKeys() {
+    for (byte [] k : KEYS) {
+      byte [] cp = new byte[k.length+1];
+      System.arraycopy(k, 0, cp, 0, k.length);
+      cp[k.length] = 1;
+
+      keys.add(cp);
+    }
+  }
+
+  List<byte[]> keys = new ArrayList<byte[]>();
+
+  public void testMultiPut() throws Exception {
+
+    HTable table = new HTable(TEST_TABLE);
+    table.setAutoFlush(false);
+    table.setWriteBufferSize(10 * 1024 * 1024);
+
+    for ( byte [] k : keys ) {
+      Put put = new Put(k);
+      put.add(BYTES_FAMILY, QUALIFIER, VALUE);
+
+      table.put(put);
+    }
+
+    table.flushCommits();
+
+    for (byte [] k : keys ) {
+      Get get = new Get(k);
+      get.addColumn(BYTES_FAMILY, QUALIFIER);
+
+      Result r = table.get(get);
+
+      assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
+      assertEquals(0,
+          Bytes.compareTo(VALUE,
+              r.getValue(BYTES_FAMILY, QUALIFIER)));
+    }
+
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    ClusterStatus cs = admin.getClusterStatus();
+
+    assertEquals(2, cs.getServers());
+    for ( HServerInfo info : cs.getServerInfo()) {
+      System.out.println(info);
+      assertTrue( info.getLoad().getNumberOfRegions() > 10);
+    }
+  }
+}



Mime
View raw message