hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject svn commit: r1446147 [9/35] - in /hbase/branches/hbase-7290v2: ./ bin/ conf/ dev-support/ hbase-client/ hbase-common/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/ hbase-common/src/m...
Date Thu, 14 Feb 2013 12:58:21 GMT
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Feb 14 12:58:12 2013
@@ -21,16 +21,15 @@ package org.apache.hadoop.hbase.client;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.lang.reflect.Proxy;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -53,20 +52,17 @@ import org.apache.hadoop.hbase.ServerNam
 import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -245,7 +241,6 @@ public class HTable implements HTableInt
 
   /**
    * setup this HTable's parameter based on the passed configuration
-   * @param conf
    */
   private void finishSetup() throws IOException {
     this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
@@ -347,11 +342,10 @@ public class HTable implements HTableInt
   }
 
   /**
-   * Finds the region on which the given row is being served.
+   * Finds the region on which the given row is being served. Does not reload the cache.
    * @param row Row to find.
    * @return Location of the row.
    * @throws IOException if a remote or network exception occurs
-   * @deprecated use {@link #getRegionLocation(byte [], boolean)} instead
    */
   public HRegionLocation getRegionLocation(final byte [] row)
   throws IOException {
@@ -361,8 +355,7 @@ public class HTable implements HTableInt
   /**
    * Finds the region on which the given row is being served.
    * @param row Row to find.
-   * @param reload whether or not to reload information or just use cached
-   * information
+   * @param reload true to reload information or false to use cached information
    * @return Location of the row.
    * @throws IOException if a remote or network exception occurs
    */
@@ -882,6 +875,147 @@ public class HTable implements HTableInt
   }
 
   /**
+   * Goal of this inner class is to keep track of the initial position of a get in a list before
+   * sorting it. This is used to send back results in the same orders we got the Gets before we sort
+   * them.
+   */
+  private static class SortedGet implements Comparable<SortedGet> {
+    protected int initialIndex = -1; // Used to store the get initial index in a list.
+    protected Get get; // Encapsulated Get instance.
+
+    public SortedGet (Get get, int initialIndex) {
+      this.get = get;
+      this.initialIndex = initialIndex;
+    }
+
+    public int getInitialIndex() {
+      return initialIndex;
+    }
+
+    @Override
+    public int compareTo(SortedGet o) {
+      return get.compareTo(o.get);
+    }
+
+    public Get getGet() {
+      return get;
+    }
+
+    @Override
+    public int hashCode() {
+      return get.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof SortedGet)
+        return get.equals(((SortedGet)obj).get);
+      else
+        return false;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Boolean[] exists(final List<Get> gets) throws IOException {
+    // Prepare the sorted list of gets. Take the list of gets received, and encapsulate them into
+    // a list of SortedGet instances. Simple list parsing, so complexity here is O(n)
+    // The list is later used to recreate the response order based on the order the Gets
+    // got received.
+    ArrayList<SortedGet> sortedGetsList = new ArrayList<HTable.SortedGet>();
+    for (int indexGet = 0; indexGet < gets.size(); indexGet++) {
+      sortedGetsList.add(new SortedGet (gets.get(indexGet), indexGet));
+    }
+
+    // Sorting the list to get the Gets ordered based on the key.
+    Collections.sort(sortedGetsList); // O(n log n)
+
+    // step 1: sort the requests by regions to send them bundled.
+    // Map key is startKey index. Map value is the list of Gets related to the region starting
+    // with the startKey.
+    Map<Integer, List<Get>> getsByRegion = new HashMap<Integer, List<Get>>();
+
+    // Reference map to quickly find back in which region a get belongs.
+    Map<Get, Integer> getToRegionIndexMap = new HashMap<Get, Integer>();
+    Pair<byte[][], byte[][]> startEndKeys = getStartEndKeys();
+
+    int regionIndex = 0;
+    for (final SortedGet get : sortedGetsList) {
+      // Progress on the regions until we find the one the current get resides in.
+      while ((regionIndex < startEndKeys.getSecond().length) && ((Bytes.compareTo(startEndKeys.getSecond()[regionIndex], get.getGet().getRow()) <= 0))) {
+        regionIndex++;
+      }
+      List<Get> regionGets = getsByRegion.get(regionIndex);
+      if (regionGets == null) {
+        regionGets = new ArrayList<Get>();
+        getsByRegion.put(regionIndex, regionGets);
+      }
+      regionGets.add(get.getGet());
+      getToRegionIndexMap.put(get.getGet(), regionIndex);
+    }
+
+    // step 2: make the requests
+    Map<Integer, Future<List<Boolean>>> futures =
+        new HashMap<Integer, Future<List<Boolean>>>(sortedGetsList.size());
+    for (final Map.Entry<Integer, List<Get>> getsByRegionEntry : getsByRegion.entrySet()) {
+      Callable<List<Boolean>> callable = new Callable<List<Boolean>>() {
+        public List<Boolean> call() throws Exception {
+          return new ServerCallable<List<Boolean>>(connection, tableName, getsByRegionEntry.getValue()
+              .get(0).getRow(), operationTimeout) {
+            public List<Boolean> call() throws IOException {
+              try {
+                MultiGetRequest requests = RequestConverter.buildMultiGetRequest(location
+                    .getRegionInfo().getRegionName(), getsByRegionEntry.getValue(), true, false);
+                MultiGetResponse responses = server.multiGet(null, requests);
+                return responses.getExistsList();
+              } catch (ServiceException se) {
+                throw ProtobufUtil.getRemoteException(se);
+              }
+            }
+          }.withRetries();
+        }
+      };
+      futures.put(getsByRegionEntry.getKey(), pool.submit(callable));
+    }
+
+    // step 3: collect the failures and successes
+    Map<Integer, List<Boolean>> responses = new HashMap<Integer, List<Boolean>>();
+    for (final Map.Entry<Integer, List<Get>> sortedGetEntry : getsByRegion.entrySet()) {
+      try {
+        Future<List<Boolean>> future = futures.get(sortedGetEntry.getKey());
+        List<Boolean> resp = future.get();
+
+        if (resp == null) {
+          LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
+        }
+        responses.put(sortedGetEntry.getKey(), resp);
+      } catch (ExecutionException e) {
+        LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
+      } catch (InterruptedException e) {
+        LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
+        Thread.currentThread().interrupt();
+      }
+    }
+    Boolean[] results = new Boolean[sortedGetsList.size()];
+
+    // step 4: build the response.
+    Map<Integer, Integer> indexes = new HashMap<Integer, Integer>();
+    for (int i = 0; i < sortedGetsList.size(); i++) {
+      Integer regionInfoIndex = getToRegionIndexMap.get(sortedGetsList.get(i).getGet());
+      Integer index = indexes.get(regionInfoIndex);
+      if (index == null) {
+        index = 0;
+      }
+      results[sortedGetsList.get(i).getInitialIndex()] = responses.get(regionInfoIndex).get(index);
+      indexes.put(regionInfoIndex, index + 1);
+    }
+
+    return results;
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override
@@ -960,7 +1094,7 @@ public class HTable implements HTableInt
   }
 
   // validate for well-formedness
-  private void validatePut(final Put put) throws IllegalArgumentException{
+  public void validatePut(final Put put) throws IllegalArgumentException{
     if (put.isEmpty()) {
       throw new IllegalArgumentException("No columns to insert");
     }
@@ -979,46 +1113,6 @@ public class HTable implements HTableInt
    * {@inheritDoc}
    */
   @Override
-  public RowLock lockRow(final byte [] row)
-  throws IOException {
-    return new ServerCallable<RowLock>(connection, tableName, row, operationTimeout) {
-        public RowLock call() throws IOException {
-          try {
-            LockRowRequest request = RequestConverter.buildLockRowRequest(
-              location.getRegionInfo().getRegionName(), row);
-            LockRowResponse response = server.lockRow(null, request);
-            return new RowLock(row, response.getLockId());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      }.withRetries();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void unlockRow(final RowLock rl)
-  throws IOException {
-    new ServerCallable<Boolean>(connection, tableName, rl.getRow(), operationTimeout) {
-        public Boolean call() throws IOException {
-          try {
-            UnlockRowRequest request = RequestConverter.buildUnlockRowRequest(
-              location.getRegionInfo().getRegionName(), rl.getLockId());
-            server.unlockRow(null, request);
-            return Boolean.TRUE;
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      }.withRetries();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
   public boolean isAutoFlush() {
     return autoFlush;
   }
@@ -1195,22 +1289,6 @@ public class HTable implements HTableInt
   /**
    * {@inheritDoc}
    */
-  @Override
-  @Deprecated
-  public <T extends CoprocessorProtocol> T coprocessorProxy(
-      Class<T> protocol, byte[] row) {
-    return (T)Proxy.newProxyInstance(this.getClass().getClassLoader(),
-        new Class[]{protocol},
-        new ExecRPCInvoker(configuration,
-            connection,
-            protocol,
-            tableName,
-            row));
-  }
-
-  /**
-   * {@inheritDoc}
-   */
   public CoprocessorRpcChannel coprocessorService(byte[] row) {
     return new RegionCoprocessorRpcChannel(connection, tableName, row);
   }
@@ -1219,57 +1297,14 @@ public class HTable implements HTableInt
    * {@inheritDoc}
    */
   @Override
-  @Deprecated
-  public <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
-      Class<T> protocol, byte[] startKey, byte[] endKey,
-      Batch.Call<T,R> callable)
-      throws IOException, Throwable {
-
-    final Map<byte[],R> results =  Collections.synchronizedMap(new TreeMap<byte[],R>(
-        Bytes.BYTES_COMPARATOR));
-    coprocessorExec(protocol, startKey, endKey, callable,
-        new Batch.Callback<R>(){
-      public void update(byte[] region, byte[] row, R value) {
-        results.put(region, value);
-      }
-    });
-    return results;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  @Deprecated
-  public <T extends CoprocessorProtocol, R> void coprocessorExec(
-      Class<T> protocol, byte[] startKey, byte[] endKey,
-      Batch.Call<T,R> callable, Batch.Callback<R> callback)
-      throws IOException, Throwable {
-
-    // get regions covered by the row range
-    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
-    connection.processExecs(protocol, keys, tableName, pool, callable,
-        callback);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
       throws ServiceException, Throwable {
-    final Map<byte[],R> results =  new ConcurrentSkipListMap<byte[], R>(Bytes.BYTES_COMPARATOR);
+    final Map<byte[],R> results =  Collections.synchronizedMap(
+        new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
       public void update(byte[] region, byte[] row, R value) {
-        if (value == null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Call to " + service.getName() +
-                " received NULL value from Batch.Call for region " + Bytes.toStringBinary(region));
-          }
-        } else {
-          results.put(region, value);
-        }
+        results.put(region, value);
       }
     });
     return results;

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Thu Feb 14 12:58:12 2013
@@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 
 /**
@@ -65,7 +64,7 @@ public interface HTableInterface extends
   HTableDescriptor getTableDescriptor() throws IOException;
 
   /**
-   * Test for the existence of columns in the table, as specified in the Get.
+   * Test for the existence of columns in the table, as specified by the Get.
    * <p>
    *
    * This will return true if the Get matches one or more keys, false if not.
@@ -81,12 +80,29 @@ public interface HTableInterface extends
   boolean exists(Get get) throws IOException;
 
   /**
-   * Method that does a batch call on Deletes, Gets and Puts. The ordering of
-   * execution of the actions is not defined. Meaning if you do a Put and a
+   * Test for the existence of columns in the table, as specified by the Gets.
+   * <p>
+   *
+   * This will return an array of booleans. Each value will be true if the related Get matches
+   * one or more keys, false if not.
+   * <p>
+   *
+   * This is a server-side call so it prevents any data from being transfered to
+   * the client.
+   *
+   * @param gets the Gets
+   * @return Array of Boolean true if the specified Get matches one or more keys, false if not
+   * @throws IOException e
+   */
+  Boolean[] exists(List<Get> gets) throws IOException;
+
+  /**
+   * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations.
+   * The ordering of execution of the actions is not defined. Meaning if you do a Put and a
    * Get in the same {@link #batch} call, you will not necessarily be
    * guaranteed that the Get returns what the Put had put.
    *
-   * @param actions list of Get, Put, Delete objects
+   * @param actions list of Get, Put, Delete, Increment, Append, RowMutations objects
    * @param results Empty Object[], same size as actions. Provides access to partial
    *                results, in case an exception is thrown. A null in the result array means that
    *                the call for that action failed, even after retries
@@ -99,7 +115,7 @@ public interface HTableInterface extends
    * Same as {@link #batch(List, Object[])}, but returns an array of
    * results instead of using a results parameter reference.
    *
-   * @param actions list of Get, Put, Delete objects
+   * @param actions list of Get, Put, Delete, Increment, Append, RowMutations objects
    * @return the results from the actions. A null in the return array means that
    *         the call for that action failed, even after retries
    * @throws IOException
@@ -395,116 +411,6 @@ public interface HTableInterface extends
   void close() throws IOException;
 
   /**
-   * Obtains a lock on a row.
-   *
-   * @param row The row to lock.
-   * @return A {@link RowLock} containing the row and lock id.
-   * @throws IOException if a remote or network exception occurs.
-   * @see RowLock
-   * @see #unlockRow
-   */
-  RowLock lockRow(byte[] row) throws IOException;
-
-  /**
-   * Releases a row lock.
-   *
-   * @param rl The row lock to release.
-   * @throws IOException if a remote or network exception occurs.
-   * @see RowLock
-   * @see #unlockRow
-   */
-  void unlockRow(RowLock rl) throws IOException;
-
-  /**
-   * Creates and returns a proxy to the CoprocessorProtocol instance running in the
-   * region containing the specified row.  The row given does not actually have
-   * to exist.  Whichever region would contain the row based on start and end keys will
-   * be used.  Note that the {@code row} parameter is also not passed to the
-   * coprocessor handler registered for this protocol, unless the {@code row}
-   * is separately passed as an argument in a proxy method call.  The parameter
-   * here is just used to locate the region used to handle the call.
-   *
-   * @param protocol The class or interface defining the remote protocol
-   * @param row The row key used to identify the remote region location
-   * @return A CoprocessorProtocol instance
-   * @deprecated since 0.96.  Use {@link HTableInterface#coprocessorService(byte[])} instead.
-   */
-  @Deprecated
-  <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, byte[] row);
-
-  /**
-   * Invoke the passed
-   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} against
-   * the {@link CoprocessorProtocol} instances running in the selected regions.
-   * All regions beginning with the region containing the <code>startKey</code>
-   * row, through to the region containing the <code>endKey</code> row (inclusive)
-   * will be used.  If <code>startKey</code> or <code>endKey</code> is
-   * <code>null</code>, the first and last regions in the table, respectively,
-   * will be used in the range selection.
-   *
-   * @param protocol the CoprocessorProtocol implementation to call
-   * @param startKey start region selection with region containing this row
-   * @param endKey select regions up to and including the region containing
-   * this row
-   * @param callable wraps the CoprocessorProtocol implementation method calls
-   * made per-region
-   * @param <T> CoprocessorProtocol subclass for the remote invocation
-   * @param <R> Return type for the
-   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
-   * method
-   * @return a <code>Map</code> of region names to
-   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} return values
-   *
-   * @deprecated since 0.96.  Use
-   * {@link HTableInterface#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)} instead.
-   */
-  @Deprecated
-  <T extends CoprocessorProtocol, R> Map<byte[],R> coprocessorExec(
-      Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable)
-      throws IOException, Throwable;
-
-  /**
-   * Invoke the passed
-   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} against
-   * the {@link CoprocessorProtocol} instances running in the selected regions.
-   * All regions beginning with the region containing the <code>startKey</code>
-   * row, through to the region containing the <code>endKey</code> row
-   * (inclusive)
-   * will be used.  If <code>startKey</code> or <code>endKey</code> is
-   * <code>null</code>, the first and last regions in the table, respectively,
-   * will be used in the range selection.
-   *
-   * <p>
-   * For each result, the given
-   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
-   * method will be called.
-   *</p>
-   *
-   * @param protocol the CoprocessorProtocol implementation to call
-   * @param startKey start region selection with region containing this row
-   * @param endKey select regions up to and including the region containing
-   * this row
-   * @param callable wraps the CoprocessorProtocol implementation method calls
-   * made per-region
-   * @param callback an instance upon which
-   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)} with the
-   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
-   * return value for each region
-   * @param <T> CoprocessorProtocol subclass for the remote invocation
-   * @param <R> Return type for the
-   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)}
-   * method
-   *
-   * @deprecated since 0.96.
-   * Use {@link HTableInterface#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)} instead.
-   */
-  @Deprecated
-  <T extends CoprocessorProtocol, R> void coprocessorExec(
-      Class<T> protocol, byte[] startKey, byte[] endKey,
-      Batch.Call<T,R> callable, Batch.Callback<R> callback)
-      throws IOException, Throwable;
-
-  /**
    * Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the
    * table region containing the specified row.  The row given does not actually have
    * to exist.  Whichever region would contain the row based on start and end keys will

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java Thu Feb 14 12:58:12 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.PoolMap;
@@ -41,7 +40,7 @@ import org.apache.hadoop.hbase.util.Pool
 
 /**
  * A simple pool of HTable instances.
- * 
+ *
  * Each HTablePool acts as a pool for all tables. To use, instantiate an
  * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
  *
@@ -51,12 +50,12 @@ import org.apache.hadoop.hbase.util.Pool
  * Once you are done with it, close your instance of {@link HTableInterface}
  * by calling {@link HTableInterface#close()} rather than returning the tables
  * to the pool with (deprecated) {@link #putTable(HTableInterface)}.
- * 
+ *
  * <p>
  * A pool can be created with a <i>maxSize</i> which defines the most HTable
  * references that will ever be retained for each table. Otherwise the default
  * is {@link Integer#MAX_VALUE}.
- * 
+ *
  * <p>
  * Pool will manage its own connections to the cluster. See
  * {@link HConnectionManager}.
@@ -79,7 +78,7 @@ public class HTablePool implements Close
 
   /**
    * Constructor to set maximum versions and use the specified configuration.
-   * 
+   *
    * @param config
    *          configuration
    * @param maxSize
@@ -92,7 +91,7 @@ public class HTablePool implements Close
   /**
    * Constructor to set maximum versions and use the specified configuration and
    * table factory.
-   * 
+   *
    * @param config
    *          configuration
    * @param maxSize
@@ -108,7 +107,7 @@ public class HTablePool implements Close
   /**
    * Constructor to set maximum versions and use the specified configuration and
    * pool type.
-   * 
+   *
    * @param config
    *          configuration
    * @param maxSize
@@ -128,7 +127,7 @@ public class HTablePool implements Close
    * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
    * type is null or not one of those two values, then it will default to
    * {@link PoolType#Reusable}.
-   * 
+   *
    * @param config
    *          configuration
    * @param maxSize
@@ -168,7 +167,7 @@ public class HTablePool implements Close
    * Get a reference to the specified table from the pool.
    * <p>
    * <p/>
-   * 
+   *
    * @param tableName
    *          table name
    * @return a reference to the specified table
@@ -186,9 +185,9 @@ public class HTablePool implements Close
   /**
    * Get a reference to the specified table from the pool.
    * <p>
-   * 
+   *
    * Create a new one if one is not available.
-   * 
+   *
    * @param tableName
    *          table name
    * @return a reference to the specified table
@@ -206,9 +205,9 @@ public class HTablePool implements Close
   /**
    * Get a reference to the specified table from the pool.
    * <p>
-   * 
+   *
    * Create a new one if one is not available.
-   * 
+   *
    * @param tableName
    *          table name
    * @return a reference to the specified table
@@ -222,7 +221,7 @@ public class HTablePool implements Close
   /**
    * This method is not needed anymore, clients should call
    * HTableInterface.close() rather than returning the tables to the pool
-   * 
+   *
    * @param table
    *          the proxy table user got from pool
    * @deprecated
@@ -248,10 +247,10 @@ public class HTablePool implements Close
   /**
    * Puts the specified HTable back into the pool.
    * <p>
-   * 
+   *
    * If the pool already contains <i>maxSize</i> references to the table, then
    * the table instance gets closed after flushing buffered edits.
-   * 
+   *
    * @param table
    *          table
    */
@@ -279,7 +278,7 @@ public class HTablePool implements Close
    * Note: this is a 'shutdown' of the given table pool and different from
    * {@link #putTable(HTableInterface)}, that is used to return the table
    * instance to the pool for future re-use.
-   * 
+   *
    * @param tableName
    */
   public void closeTablePool(final String tableName) throws IOException {
@@ -294,7 +293,7 @@ public class HTablePool implements Close
 
   /**
    * See {@link #closeTablePool(String)}.
-   * 
+   *
    * @param tableName
    */
   public void closeTablePool(final byte[] tableName) throws IOException {
@@ -314,7 +313,7 @@ public class HTablePool implements Close
     this.tables.clear();
   }
 
-  int getCurrentPoolSize(String tableName) {
+  public int getCurrentPoolSize(String tableName) {
     return tables.size(tableName);
   }
 
@@ -352,6 +351,11 @@ public class HTablePool implements Close
     }
 
     @Override
+    public Boolean[] exists(List<Get> gets) throws IOException {
+      return table.exists(gets);
+    }
+
+    @Override
     public void batch(List<? extends Row> actions, Object[] results) throws IOException,
         InterruptedException {
       table.batch(actions, results);
@@ -457,7 +461,7 @@ public class HTablePool implements Close
 
     /**
      * Returns the actual table back to the pool
-     * 
+     *
      * @throws IOException
      */
     public void close() throws IOException {
@@ -465,37 +469,6 @@ public class HTablePool implements Close
     }
 
     @Override
-    public RowLock lockRow(byte[] row) throws IOException {
-      return table.lockRow(row);
-    }
-
-    @Override
-    public void unlockRow(RowLock rl) throws IOException {
-      table.unlockRow(rl);
-    }
-
-    @Override
-    public <T extends CoprocessorProtocol> T coprocessorProxy(
-        Class<T> protocol, byte[] row) {
-      return table.coprocessorProxy(protocol, row);
-    }
-
-    @Override
-    public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(
-        Class<T> protocol, byte[] startKey, byte[] endKey,
-        Batch.Call<T, R> callable) throws IOException, Throwable {
-      return table.coprocessorExec(protocol, startKey, endKey, callable);
-    }
-
-    @Override
-    public <T extends CoprocessorProtocol, R> void coprocessorExec(
-        Class<T> protocol, byte[] startKey, byte[] endKey,
-        Batch.Call<T, R> callable, Batch.Callback<R> callback)
-        throws IOException, Throwable {
-      table.coprocessorExec(protocol, startKey, endKey, callable, callback);
-    }
-
-    @Override
     public CoprocessorRpcChannel coprocessorService(byte[] row) {
       return table.coprocessorService(row);
     }
@@ -521,7 +494,7 @@ public class HTablePool implements Close
 
     /**
      * Expose the wrapped HTable to tests in the same package
-     * 
+     *
      * @return wrapped htable
      */
     HTableInterface getWrappedTable() {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Increment.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Increment.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Increment.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Increment.java Thu Feb 14 12:58:12 2013
@@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.util.Byte
 @InterfaceStability.Stable
 public class Increment implements Row {
   private byte [] row = null;
-  private long lockId = -1L;
   private boolean writeToWAL = true;
   private TimeRange tr = new TimeRange();
   private Map<byte [], NavigableMap<byte [], Long>> familyMap =
@@ -55,31 +54,17 @@ public class Increment implements Row {
   public Increment() {}
 
   /**
-   * Create a Increment operation for the specified row.
-   * <p>
-   * At least one column must be incremented.
-   * @param row row key
-   */
-  public Increment(byte [] row) {
-    this(row, null);
-  }
-
-  /**
    * Create a Increment operation for the specified row, using an existing row
    * lock.
    * <p>
    * At least one column must be incremented.
    * @param row row key
-   * @param rowLock previously acquired row lock, or null
    */
-  public Increment(byte [] row, RowLock rowLock) {
+  public Increment(byte [] row) {
     if (row == null) {
       throw new IllegalArgumentException("Cannot increment a null row");
     }
     this.row = row;
-    if(rowLock != null) {
-      this.lockId = rowLock.getLockId();
-    }
   }
 
   /**
@@ -119,22 +104,6 @@ public class Increment implements Row {
   }
 
   /**
-   * Method for retrieving the increment's RowLock
-   * @return RowLock
-   */
-  public RowLock getRowLock() {
-    return new RowLock(this.row, this.lockId);
-  }
-
-  /**
-   * Method for retrieving the increment's lockId
-   * @return lockId
-   */
-  public long getLockId() {
-    return this.lockId;
-  }
-
-  /**
    * Method for retrieving whether WAL will be written to or not
    * @return true if WAL should be used, false if not
    */
@@ -274,4 +243,16 @@ public class Increment implements Row {
   public int compareTo(Row i) {
     return Bytes.compareTo(this.getRow(), i.getRow());
   }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    Row other = (Row) obj;
+    return compareTo(other) == 0;
+  }
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Thu Feb 14 12:58:12 2013
@@ -40,7 +40,6 @@ public abstract class Mutation extends O
 
   protected byte [] row = null;
   protected long ts = HConstants.LATEST_TIMESTAMP;
-  protected long lockId = -1L;
   protected boolean writeToWAL = true;
   protected Map<byte [], List<KeyValue>> familyMap =
       new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
@@ -165,23 +164,6 @@ public abstract class Mutation extends O
   }
 
   /**
-   * Method for retrieving the delete's RowLock
-   * @return RowLock
-   */
-  public RowLock getRowLock() {
-    return new RowLock(this.row, this.lockId);
-  }
-
-  /**
-   * Method for retrieving the delete's lock ID.
-   *
-   * @return The lock ID.
-   */
-  public long getLockId() {
-  return this.lockId;
-  }
-
-  /**
    * Method for retrieving the timestamp
    * @return timestamp
    */
@@ -194,6 +176,7 @@ public abstract class Mutation extends O
    * @param clusterId
    */
   public void setClusterId(UUID clusterId) {
+    if (clusterId == null) return;
     byte[] val = new byte[2*Bytes.SIZEOF_LONG];
     Bytes.putLong(val, 0, clusterId.getMostSignificantBits());
     Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits());

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java Thu Feb 14 12:58:12 2013
@@ -35,7 +35,7 @@ public abstract class OperationWithAttri
   private Map<String, byte[]> attributes;
 
   // used for uniquely identifying an operation
-  static public String ID_ATRIBUTE = "_operation.attributes.id";
+  public static final String ID_ATRIBUTE = "_operation.attributes.id";
 
   public void setAttribute(String name, byte[] value) {
     if (attributes == null && value == null) {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Put.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Put.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Put.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Put.java Thu Feb 14 12:58:12 2013
@@ -46,7 +46,7 @@ import java.util.TreeMap;
 public class Put extends Mutation implements HeapSize, Comparable<Row> {
   private static final long OVERHEAD = ClassSize.align(
       ClassSize.OBJECT + 2 * ClassSize.REFERENCE +
-      2 * Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN +
+      1 * Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN +
       ClassSize.REFERENCE + ClassSize.TREEMAP);
 
   /**
@@ -54,16 +54,7 @@ public class Put extends Mutation implem
    * @param row row key
    */
   public Put(byte [] row) {
-    this(row, null);
-  }
-
-  /**
-   * Create a Put operation for the specified row, using an existing row lock.
-   * @param row row key
-   * @param rowLock previously acquired row lock, or null
-   */
-  public Put(byte [] row, RowLock rowLock) {
-      this(row, HConstants.LATEST_TIMESTAMP, rowLock);
+    this(row, HConstants.LATEST_TIMESTAMP);
   }
 
   /**
@@ -73,24 +64,11 @@ public class Put extends Mutation implem
    * @param ts timestamp
    */
   public Put(byte[] row, long ts) {
-    this(row, ts, null);
-  }
-
-  /**
-   * Create a Put operation for the specified row, using a given timestamp, and an existing row lock.
-   * @param row row key
-   * @param ts timestamp
-   * @param rowLock previously acquired row lock, or null
-   */
-  public Put(byte [] row, long ts, RowLock rowLock) {
     if(row == null || row.length > HConstants.MAX_ROW_LENGTH) {
       throw new IllegalArgumentException("Row key is invalid");
     }
     this.row = Arrays.copyOf(row, row.length);
     this.ts = ts;
-    if(rowLock != null) {
-      this.lockId = rowLock.getLockId();
-    }
   }
 
   /**
@@ -98,7 +76,7 @@ public class Put extends Mutation implem
    * @param putToCopy put to copy
    */
   public Put(Put putToCopy) {
-    this(putToCopy.getRow(), putToCopy.ts, putToCopy.getRowLock());
+    this(putToCopy.getRow(), putToCopy.ts);
     this.familyMap =
       new TreeMap<byte [], List<KeyValue>>(Bytes.BYTES_COMPARATOR);
     for(Map.Entry<byte [], List<KeyValue>> entry :

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java Thu Feb 14 12:58:12 2013
@@ -77,8 +77,10 @@ public class RetriesExhaustedException e
    * @param exceptions List of exceptions that failed before giving up
    */
   public RetriesExhaustedException(final int numTries,
-      final List<ThrowableWithExtraContext> exceptions) {
-    super(getMessage(numTries, exceptions));
+                                   final List<ThrowableWithExtraContext> exceptions) {
+    super(getMessage(numTries, exceptions),
+        (exceptions != null && !exceptions.isEmpty() ?
+            exceptions.get(exceptions.size() - 1).t : null));
   }
 
   private static String getMessage(String callableVitals, int numTries,

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java Thu Feb 14 12:58:12 2013
@@ -22,7 +22,10 @@ package org.apache.hadoop.hbase.client;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.util.Bytes;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -54,7 +57,7 @@ extends RetriesExhaustedException {
     super("Failed " + exceptions.size() + " action" +
         pluralize(exceptions) + ": " +
         getDesc(exceptions, actions, hostnameAndPort));
-
+    
     this.exceptions = exceptions;
     this.actions = actions;
     this.hostnameAndPort = hostnameAndPort;
@@ -116,6 +119,24 @@ extends RetriesExhaustedException {
     return s;
   }
 
+  public String getExhaustiveDescription() {
+    StringWriter errorWriter = new StringWriter();
+    for (int i = 0; i < this.exceptions.size(); ++i) {
+      Throwable t = this.exceptions.get(i);
+      Row action = this.actions.get(i);
+      String server = this.hostnameAndPort.get(i);
+      errorWriter.append("Error #" + i + " from [" + server + "] for ["
+        + ((action == null) ? "unknown key" : Bytes.toStringBinary(action.getRow())) + "]");
+      if (t != null) {
+        PrintWriter pw = new PrintWriter(errorWriter);
+        t.printStackTrace(pw);
+        pw.flush();
+      }
+    }
+    return errorWriter.toString();
+  }
+
+
   public static Map<String, Integer> classifyExs(List<Throwable> ths) {
     Map<String, Integer> cls = new HashMap<String, Integer>();
     for (Throwable t : ths) {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Scan.java Thu Feb 14 12:58:12 2013
@@ -92,12 +92,13 @@ public class Scan extends OperationWithA
 
   private int storeLimit = -1;
   private int storeOffset = 0;
-  
+  private boolean getScan;
+
   // If application wants to collect scan metrics, it needs to
   // call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
-  static public String SCAN_ATTRIBUTES_METRICS_ENABLE =
+  static public final String SCAN_ATTRIBUTES_METRICS_ENABLE =
     "scan.attributes.metrics.enable";
-  static public String SCAN_ATTRIBUTES_METRICS_DATA =
+  static public final String SCAN_ATTRIBUTES_METRICS_DATA =
     "scan.attributes.metrics.data";
 
   /*
@@ -110,6 +111,7 @@ public class Scan extends OperationWithA
   private TimeRange tr = new TimeRange();
   private Map<byte [], NavigableSet<byte []>> familyMap =
     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
+  private Boolean loadColumnFamiliesOnDemand = null;
 
   /**
    * Create a Scan operation across all rows.
@@ -140,6 +142,8 @@ public class Scan extends OperationWithA
   public Scan(byte [] startRow, byte [] stopRow) {
     this.startRow = startRow;
     this.stopRow = stopRow;
+    //if the startRow and stopRow both are empty, it is not a Get
+    this.getScan = isStartRowAndEqualsStopRow();
   }
 
   /**
@@ -158,7 +162,9 @@ public class Scan extends OperationWithA
     caching = scan.getCaching();
     maxResultSize = scan.getMaxResultSize();
     cacheBlocks = scan.getCacheBlocks();
+    getScan = scan.isGetScan();
     filter = scan.getFilter(); // clone?
+    loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
     TimeRange ctr = scan.getTimeRange();
     tr = new TimeRange(ctr.getMin(), ctr.getMax());
     Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
@@ -192,13 +198,17 @@ public class Scan extends OperationWithA
     this.storeOffset = get.getRowOffsetPerColumnFamily();
     this.tr = get.getTimeRange();
     this.familyMap = get.getFamilyMap();
+    this.getScan = true;
   }
 
   public boolean isGetScan() {
-    return this.startRow != null && this.startRow.length > 0 &&
-      Bytes.equals(this.startRow, this.stopRow);
+    return this.getScan || isStartRowAndEqualsStopRow();
   }
 
+  private boolean isStartRowAndEqualsStopRow() {
+    return this.startRow != null && this.startRow.length > 0 &&
+        Bytes.equals(this.startRow, this.stopRow);
+  }
   /**
    * Get all columns from the specified family.
    * <p>
@@ -519,6 +529,41 @@ public class Scan extends OperationWithA
   }
 
   /**
+   * Set the value indicating whether loading CFs on demand should be allowed (cluster
+   * default is false). On-demand CF loading doesn't load column families until necessary, e.g.
+   * if you filter on one column, the other column family data will be loaded only for the rows
+   * that are included in result, not all rows like in normal case.
+   * With column-specific filters, like SingleColumnValueFilter w/filterIfMissing == true,
+   * this can deliver huge perf gains when there's a cf with lots of data; however, it can
+   * also lead to some inconsistent results, as follows:
+   * - if someone does a concurrent update to both column families in question you may get a row
+   *   that never existed, e.g. for { rowKey = 5, { cat_videos => 1 }, { video => "my cat" } }
+   *   someone puts rowKey 5 with { cat_videos => 0 }, { video => "my dog" }, concurrent scan
+   *   filtering on "cat_videos == 1" can get { rowKey = 5, { cat_videos => 1 },
+   *   { video => "my dog" } }.
+   * - if there's a concurrent split and you have more than 2 column families, some rows may be
+   *   missing some column families.
+   */
+  public void setLoadColumnFamiliesOnDemand(boolean value) {
+    this.loadColumnFamiliesOnDemand = value;
+  }
+
+  /**
+   * Get the raw loadColumnFamiliesOnDemand setting; if it's not set, can be null.
+   */
+  public Boolean getLoadColumnFamiliesOnDemandValue() {
+    return this.loadColumnFamiliesOnDemand;
+  }
+
+  /**
+   * Get the logical value indicating whether on-demand CF loading should be allowed.
+   */
+  public boolean doLoadColumnFamiliesOnDemand() {
+    return (this.loadColumnFamiliesOnDemand != null)
+      && this.loadColumnFamiliesOnDemand.booleanValue();
+  }
+
+  /**
    * Compile the table and column family (i.e. schema) information
    * into a String. Useful for parsing and aggregation by debugging,
    * logging, and administration tools.
@@ -547,7 +592,7 @@ public class Scan extends OperationWithA
    * Useful for debugging, logging, and administration tools.
    * @param maxCols a limit on the number of columns output prior to truncation
    * @return Map
-   */ 
+   */
   @Override
   public Map<String, Object> toMap(int maxCols) {
     // start with the fingerpring map and build on top of it
@@ -564,6 +609,7 @@ public class Scan extends OperationWithA
     map.put("caching", this.caching);
     map.put("maxResultSize", this.maxResultSize);
     map.put("cacheBlocks", this.cacheBlocks);
+    map.put("loadColumnFamiliesOnDemand", this.loadColumnFamiliesOnDemand);
     List<Long> timeRange = new ArrayList<Long>();
     timeRange.add(this.tr.getMin());
     timeRange.add(this.tr.getMax());

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Thu Feb 14 12:58:12 2013
@@ -140,8 +140,9 @@ public class ScannerCallable extends Ser
           incRPCcallsMetrics();
           ScanRequest request =
             RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
+          ScanResponse response = null;
           try {
-            ScanResponse response = server.scan(null, request);
+            response = server.scan(null, request);
             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
             // from client to server will increment this number in both sides. Client passes this
             // number along with the request and at RS side both the incoming nextCallSeq and its
@@ -171,7 +172,7 @@ public class ScannerCallable extends Ser
           } catch (ServiceException se) {
             throw ProtobufUtil.getRemoteException(se);
           }
-          updateResultsMetrics(rrs);
+          updateResultsMetrics(response);
         } catch (IOException e) {
           if (logScannerActivity) {
             LOG.info("Got exception in fetching from scanner="
@@ -226,22 +227,15 @@ public class ScannerCallable extends Ser
     }
   }
 
-  private void updateResultsMetrics(Result[] rrs) {
-    if (this.scanMetrics == null || rrs == null) {
+  private void updateResultsMetrics(ScanResponse response) {
+    if (this.scanMetrics == null || !response.hasResultSizeBytes()) {
       return;
     }
-    /*
-     * broken by protobufs
-    for (Result rr : rrs) {
-      if (rr.getBytes() != null) {
-        this.scanMetrics.countOfBytesInResults.inc(rr.getBytes().getLength());
-        if (isRegionServerRemote) {
-          this.scanMetrics.countOfBytesInRemoteResults.inc(
-            rr.getBytes().getLength());
-        }
-      }
+    long value = response.getResultSizeBytes();
+    this.scanMetrics.countOfBytesInResults.addAndGet(value);
+    if (isRegionServerRemote) {
+      this.scanMetrics.countOfBytesInRemoteResults.addAndGet(value);
     }
-    */
   }
 
   private void close() {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Thu Feb 14 12:58:12 2013
@@ -33,8 +33,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.client.ClientProtocol;
-import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.ipc.RemoteException;
 
@@ -113,12 +112,12 @@ public abstract class ServerCallable<T> 
   }
 
   public void beforeCall() {
-    HBaseRPC.setRpcTimeout(this.callTimeout);
+    HBaseClientRPC.setRpcTimeout(this.callTimeout);
     this.startTime = System.currentTimeMillis();
   }
 
   public void afterCall() {
-    HBaseRPC.resetRpcTimeout();
+    HBaseClientRPC.resetRpcTimeout();
     this.endTime = System.currentTimeMillis();
   }
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java Thu Feb 14 12:58:12 2013
@@ -20,6 +20,10 @@
 package org.apache.hadoop.hbase.client.coprocessor;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -51,6 +55,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
 
 /**
  * This client class is for invoking the aggregate functions deployed on the
@@ -98,7 +103,8 @@ public class AggregationClient {
    *           The caller is supposed to handle the exception as they are thrown
    *           & propagated to it.
    */
-  public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci,
+  public <R, S, P extends Message, Q extends Message, T extends Message> 
+  R max(final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
       final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class MaxCallBack implements Batch.Callback<R> {
@@ -130,9 +136,9 @@ public class AggregationClient {
                 throw controller.getFailedOn();
               }
               if (response.getFirstPartCount() > 0) {
-                return ci.castToCellType(
-                          ci.parseResponseAsPromotedType(
-                              getBytesFromResponse(response.getFirstPart(0))));
+                ByteString b = response.getFirstPart(0);
+                Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
+                return ci.getCellValueFromProto(q);
               }
               return null;
             }
@@ -168,7 +174,8 @@ public class AggregationClient {
    * @return min val <R>
    * @throws Throwable
    */
-  public <R, S> R min(final byte[] tableName, final ColumnInterpreter<R, S> ci,
+  public <R, S, P extends Message, Q extends Message, T extends Message> 
+  R min(final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
       final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class MinCallBack implements Batch.Callback<R> {
@@ -202,9 +209,9 @@ public class AggregationClient {
                 throw controller.getFailedOn();
               }
               if (response.getFirstPartCount() > 0) {
-                return ci.castToCellType(
-                  ci.parseResponseAsPromotedType(
-                      getBytesFromResponse(response.getFirstPart(0))));
+                ByteString b = response.getFirstPart(0);
+                Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
+                return ci.getCellValueFromProto(q);
               }
               return null;
             }
@@ -231,8 +238,9 @@ public class AggregationClient {
    * @return <R, S>
    * @throws Throwable
    */
-  public <R, S> long rowCount(final byte[] tableName,
-      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+  public <R, S, P extends Message, Q extends Message, T extends Message> 
+  long rowCount(final byte[] tableName,
+      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class RowNumCallback implements Batch.Callback<Long> {
       private final AtomicLong rowCountL = new AtomicLong(0);
@@ -285,7 +293,8 @@ public class AggregationClient {
    * @return sum <S>
    * @throws Throwable
    */
-  public <R, S> S sum(final byte[] tableName, final ColumnInterpreter<R, S> ci,
+  public <R, S, P extends Message, Q extends Message, T extends Message> 
+  S sum(final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
       final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     
@@ -320,8 +329,10 @@ public class AggregationClient {
               if (response.getFirstPartCount() == 0) {
                 return null;
               }
-              return ci.parseResponseAsPromotedType(
-                  getBytesFromResponse(response.getFirstPart(0)));
+              ByteString b = response.getFirstPart(0);
+              T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
+              S s = ci.getPromotedValueFromProto(t);
+              return s;
             }
           }, sumCallBack);
     } finally {
@@ -340,8 +351,9 @@ public class AggregationClient {
    * @param scan
    * @throws Throwable
    */
-  private <R, S> Pair<S, Long> getAvgArgs(final byte[] tableName,
-      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+  private <R, S, P extends Message, Q extends Message, T extends Message>
+  Pair<S, Long> getAvgArgs(final byte[] tableName,
+      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
       S sum = null;
@@ -379,8 +391,10 @@ public class AggregationClient {
               if (response.getFirstPartCount() == 0) {
                 return pair;
               }
-              pair.setFirst(ci.parseResponseAsPromotedType(
-                  getBytesFromResponse(response.getFirstPart(0))));
+              ByteString b = response.getFirstPart(0);
+              T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
+              S s = ci.getPromotedValueFromProto(t);
+              pair.setFirst(s);
               ByteBuffer bb = ByteBuffer.allocate(8).put(
                   getBytesFromResponse(response.getSecondPart()));
               bb.rewind();
@@ -408,8 +422,9 @@ public class AggregationClient {
    * @return <R, S>
    * @throws Throwable
    */
-  public <R, S> double avg(final byte[] tableName,
-      final ColumnInterpreter<R, S> ci, Scan scan) throws Throwable {
+  public <R, S, P extends Message, Q extends Message, T extends Message>
+  double avg(final byte[] tableName,
+      final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
     Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
     return ci.divideForAvg(p.getFirst(), p.getSecond());
   }
@@ -425,8 +440,9 @@ public class AggregationClient {
    * @return
    * @throws Throwable
    */
-  private <R, S> Pair<List<S>, Long> getStdArgs(final byte[] tableName,
-      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+  private <R, S, P extends Message, Q extends Message, T extends Message>
+  Pair<List<S>, Long> getStdArgs(final byte[] tableName,
+      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
       long rowCountVal = 0l;
@@ -474,8 +490,10 @@ public class AggregationClient {
               }
               List<S> list = new ArrayList<S>();
               for (int i = 0; i < response.getFirstPartCount(); i++) {
-                list.add(ci.parseResponseAsPromotedType(
-                    getBytesFromResponse(response.getFirstPart(i))));
+                ByteString b = response.getFirstPart(i);
+                T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
+                S s = ci.getPromotedValueFromProto(t);
+                list.add(s);
               }
               pair.setFirst(list);
               ByteBuffer bb = ByteBuffer.allocate(8).put(
@@ -505,7 +523,8 @@ public class AggregationClient {
    * @return <R, S>
    * @throws Throwable
    */
-  public <R, S> double std(final byte[] tableName, ColumnInterpreter<R, S> ci,
+  public <R, S, P extends Message, Q extends Message, T extends Message>
+  double std(final byte[] tableName, ColumnInterpreter<R, S, P, Q, T> ci,
       Scan scan) throws Throwable {
     Pair<List<S>, Long> p = getStdArgs(tableName, ci, scan);
     double res = 0d;
@@ -528,9 +547,10 @@ public class AggregationClient {
    *  (sum of values, sum of weights) for all the regions chosen
    * @throws Throwable
    */
-  private <R, S> Pair<NavigableMap<byte[], List<S>>, List<S>>
+  private <R, S, P extends Message, Q extends Message, T extends Message>
+  Pair<NavigableMap<byte[], List<S>>, List<S>>
   getMedianArgs(final byte[] tableName,
-      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     final NavigableMap<byte[], List<S>> map =
       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
@@ -572,8 +592,10 @@ public class AggregationClient {
 
               List<S> list = new ArrayList<S>();
               for (int i = 0; i < response.getFirstPartCount(); i++) {
-                list.add(ci.parseResponseAsPromotedType(
-                    getBytesFromResponse(response.getFirstPart(i))));
+                ByteString b = response.getFirstPart(i);
+                T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
+                S s = ci.getPromotedValueFromProto(t);
+                list.add(s);
               }
               return list;
             }
@@ -597,7 +619,8 @@ public class AggregationClient {
    * @return R the median
    * @throws Throwable
    */
-  public <R, S> R median(final byte[] tableName, ColumnInterpreter<R, S> ci,
+  public <R, S, P extends Message, Q extends Message, T extends Message>
+  R median(final byte[] tableName, ColumnInterpreter<R, S, P, Q, T> ci,
       Scan scan) throws Throwable {
     Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(tableName, ci, scan);
     byte[] startRow = null;
@@ -672,16 +695,17 @@ public class AggregationClient {
     return null;
   }
 
-  <R,S>AggregateArgument validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S> ci)
+  <R, S, P extends Message, Q extends Message, T extends Message> AggregateArgument 
+  validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci)
       throws IOException {
     validateParameters(scan);
     final AggregateArgument.Builder requestBuilder = 
         AggregateArgument.newBuilder();
     requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
-    ByteString columnInterpreterSpecificData = null;
-    if ((columnInterpreterSpecificData = ci.columnInterpreterSpecificData()) 
+    P columnInterpreterSpecificData = null;
+    if ((columnInterpreterSpecificData = ci.getRequestData()) 
        != null) {
-      requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData);
+      requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
     }
     requestBuilder.setScan(ProtobufUtil.toScan(scan));
     return requestBuilder.build();

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Batch.java Thu Feb 14 12:58:12 2013
@@ -19,18 +19,10 @@
 
 package org.apache.hadoop.hbase.client.coprocessor;
 
-import org.apache.commons.lang.reflect.MethodUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
 
 
 /**
@@ -40,109 +32,20 @@ import java.lang.reflect.Proxy;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public abstract class Batch {
-  private static Log LOG = LogFactory.getLog(Batch.class);
-
-  /**
-   * Creates a new {@link Batch.Call} instance that invokes a method
-   * with the given parameters and returns the result.
-   *
-   * <p>
-   * Note that currently the method is naively looked up using the method name
-   * and class types of the passed arguments, which means that
-   * <em>none of the arguments can be <code>null</code></em>.
-   * For more flexibility, see
-   * {@link Batch#forMethod(java.lang.reflect.Method, Object...)}.
-   * </p>
-   *
-   * @param protocol the protocol class being called
-   * @param method the method name
-   * @param args zero or more arguments to be passed to the method
-   * (individual args cannot be <code>null</code>!)
-   * @param <T> the class type of the protocol implementation being invoked
-   * @param <R> the return type for the method call
-   * @return a {@code Callable} instance that will invoke the given method
-   * and return the results
-   * @throws NoSuchMethodException if the method named, with the given argument
-   *     types, cannot be found in the protocol class
-   * @see Batch#forMethod(java.lang.reflect.Method, Object...)
-   * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
-   */
-  @Deprecated
-  public static <T extends CoprocessorProtocol,R> Call<T,R> forMethod(
-      final Class<T> protocol, final String method, final Object... args)
-  throws NoSuchMethodException {
-    Class[] types = new Class[args.length];
-    for (int i=0; i<args.length; i++) {
-      if (args[i] == null) {
-        throw new NullPointerException("Method argument cannot be null");
-      }
-      types[i] = args[i].getClass();
-    }
-
-    Method m = MethodUtils.getMatchingAccessibleMethod(protocol, method, types);
-    if (m == null) {
-      throw new NoSuchMethodException("No matching method found for '" +
-          method + "'");
-    }
-
-    m.setAccessible(true);
-    return forMethod(m, args);
-  }
-
-  /**
-   * Creates a new {@link Batch.Call} instance that invokes a method
-   * with the given parameters and returns the result.
-   *
-   * @param method the method reference to invoke
-   * @param args zero or more arguments to be passed to the method
-   * @param <T> the class type of the protocol implementation being invoked
-   * @param <R> the return type for the method call
-   * @return a {@code Callable} instance that will invoke the given method and
-   * return the results
-   * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
-   */
-  @Deprecated
-  public static <T extends CoprocessorProtocol,R> Call<T,R> forMethod(
-      final Method method, final Object... args) {
-    return new Call<T,R>() {
-        public R call(T instance) throws IOException {
-          try {
-            if (Proxy.isProxyClass(instance.getClass())) {
-              InvocationHandler invoker = Proxy.getInvocationHandler(instance);
-              return (R)invoker.invoke(instance, method, args);
-            } else {
-              LOG.warn("Non proxied invocation of method '"+method.getName()+"'!");
-              return (R)method.invoke(instance, args);
-            }
-          }
-          catch (IllegalAccessException iae) {
-            throw new IOException("Unable to invoke method '"+
-                method.getName()+"'", iae);
-          }
-          catch (InvocationTargetException ite) {
-            throw new IOException(ite.toString(), ite);
-          }
-          catch (Throwable t) {
-            throw new IOException(t.toString(), t);
-          }
-        }
-    };
-  }
-
   /**
    * Defines a unit of work to be executed.
    *
    * <p>
    * When used with
-   * {@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
+   * {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)}
    * the implementations {@link Batch.Call#call(Object)} method will be invoked
    * with a proxy to the
-   * {@link org.apache.hadoop.hbase.ipc.CoprocessorProtocol}
+   * {@link org.apache.hadoop.hbase.coprocessor.CoprocessorService}
    * sub-type instance.
    * </p>
    * @see org.apache.hadoop.hbase.client.coprocessor
-   * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
-   * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
+   * @see org.apache.hadoop.hbase.client.HTable#coprocessorService(byte[])
+   * @see org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
    * @param <T> the instance type to be passed to
    * {@link Batch.Call#call(Object)}
    * @param <R> the return type from {@link Batch.Call#call(Object)}
@@ -157,15 +60,15 @@ public abstract class Batch {
    *
    * <p>
    * When used with
-   * {@link org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)},
+   * {@link org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)}
    * the implementation's {@link Batch.Callback#update(byte[], byte[], Object)}
    * method will be called with the {@link Batch.Call#call(Object)} return value
    * from each region in the selected range.
    * </p>
    * @param <R> the return type from the associated {@link Batch.Call#call(Object)}
-   * @see org.apache.hadoop.hbase.client.HTable#coprocessorExec(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)
+   * @see org.apache.hadoop.hbase.client.HTable#coprocessorService(Class, byte[], byte[], org.apache.hadoop.hbase.client.coprocessor.Batch.Call)
    */
   public static interface Callback<R> {
     public void update(byte[] region, byte[] row, R result);
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java Thu Feb 14 12:58:12 2013
@@ -19,16 +19,15 @@
 package org.apache.hadoop.hbase.client.coprocessor;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.protobuf.ByteString;
-
 /**
  * a concrete column interpreter implementation. The cell value is a Long value
  * and its promoted data type is also a Long value. For computing aggregation
@@ -39,7 +38,8 @@ import com.google.protobuf.ByteString;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class LongColumnInterpreter implements ColumnInterpreter<Long, Long> {
+public class LongColumnInterpreter extends ColumnInterpreter<Long, Long,
+                 EmptyMsg, LongMsg, LongMsg> {
 
   public Long getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
       throws IOException {
@@ -97,45 +97,40 @@ public class LongColumnInterpreter imple
     return o;
   }
 
-
   @Override
-  public Long parseResponseAsPromotedType(byte[] response) {
-    ByteBuffer b = ByteBuffer.allocate(8).put(response);
-    b.rewind();
-    long l = b.getLong();
+  public Long castToCellType(Long l) {
     return l;
   }
 
   @Override
-  public Long castToCellType(Long l) {
-    return l;
+  public EmptyMsg getRequestData() {
+    return EmptyMsg.getDefaultInstance();
   }
 
   @Override
-  public ByteString columnInterpreterSpecificData() {
-    // nothing
-    return null;
+  public void initialize(EmptyMsg msg) {
+    //nothing 
   }
 
   @Override
-  public void initialize(ByteString bytes) {
-    // nothing
+  public LongMsg getProtoForCellType(Long t) {
+    LongMsg.Builder builder = LongMsg.newBuilder();
+    return builder.setLongMsg(t).build();
   }
 
   @Override
-  public ByteString getProtoForCellType(Long t) {
-    return getProtoForPromotedOrCellType(t);
+  public LongMsg getProtoForPromotedType(Long s) {
+    LongMsg.Builder builder = LongMsg.newBuilder();
+    return builder.setLongMsg(s).build();
   }
 
   @Override
-  public ByteString getProtoForPromotedType(Long s) {
-    return getProtoForPromotedOrCellType(s);
+  public Long getPromotedValueFromProto(LongMsg r) {
+    return r.getLongMsg();
   }
 
-  private ByteString getProtoForPromotedOrCellType(Long s) {
-    ByteBuffer bb = ByteBuffer.allocate(8).putLong(s);
-    bb.rewind();
-    ByteString bs = ByteString.copyFrom(bb);
-    return bs;
+  @Override
+  public Long getCellValueFromProto(LongMsg q) {
+    return q.getLongMsg();
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Thu Feb 14 12:58:12 2013
@@ -185,7 +185,7 @@ public class ReplicationAdmin implements
     boolean prev = true;
     try {
       prev = getReplicating();
-      this.replicationZk.setReplicating(newState);
+      this.replicationZk.setReplication(newState);
     } catch (KeeperException e) {
       throw new IOException("Unable to set the replication state", e);
     }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/constraint/Constraints.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/constraint/Constraints.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/constraint/Constraints.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/constraint/Constraints.java Thu Feb 14 12:58:12 2013
@@ -133,7 +133,7 @@ public final class Constraints {
     }
     // now remove all the keys we found
     for (ImmutableBytesWritable key : keys) {
-      desc.remove(key.get());
+      desc.remove(key);
     }
   }
 

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java Thu Feb 14 12:58:12 2013
@@ -19,6 +19,10 @@
 package org.apache.hadoop.hbase.coprocessor;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -42,6 +46,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -49,14 +54,20 @@ import com.google.protobuf.Service;
 /**
  * A concrete AggregateProtocol implementation. Its system level coprocessor
  * that computes the aggregate function at a region level.
- * @param <T>
- * @param <S>
+ * {@link ColumnInterpreter} is used to interpret column value. This class is
+ * parameterized with the following (these are the types with which the {@link ColumnInterpreter}
+ * is parameterized, and for more description on these, refer to {@link ColumnInterpreter}):
+ * @param <T> Cell value data type
+ * @param <S> Promoted data type
+ * @param <P> PB message that is used to transport initializer specific bytes
+ * @param <Q> PB message that is used to transport Cell (<T>) instance
+ * @param <R> PB message that is used to transport Promoted (<S>) instance
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class AggregateImplementation<T, S> extends AggregateService implements
-    CoprocessorService, Coprocessor {
-  protected static Log log = LogFactory.getLog(AggregateImplementation.class);
+public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message> 
+extends AggregateService implements CoprocessorService, Coprocessor {
+  protected static final Log log = LogFactory.getLog(AggregateImplementation.class);
   private RegionCoprocessorEnvironment env;
 
   /**
@@ -73,7 +84,7 @@ public class AggregateImplementation<T, 
     AggregateResponse response = null;
     T max = null;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       T temp;
       Scan scan = ProtobufUtil.toScan(request.getScan());
       scanner = env.getRegion().getScanner(scan);
@@ -96,7 +107,7 @@ public class AggregateImplementation<T, 
       } while (hasMoreRows);
       if (max != null) {
         AggregateResponse.Builder builder = AggregateResponse.newBuilder();
-        builder.addFirstPart(ci.getProtoForCellType(max));
+        builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
         response = builder.build();
       }
     } catch (IOException e) {
@@ -127,7 +138,7 @@ public class AggregateImplementation<T, 
     InternalScanner scanner = null;
     T min = null;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       T temp;
       Scan scan = ProtobufUtil.toScan(request.getScan());
       scanner = env.getRegion().getScanner(scan);
@@ -149,7 +160,7 @@ public class AggregateImplementation<T, 
       } while (hasMoreRows);
       if (min != null) {
         response = AggregateResponse.newBuilder().addFirstPart( 
-          ci.getProtoForCellType(min)).build();
+          ci.getProtoForCellType(min).toByteString()).build();
       }
     } catch (IOException e) {
       ResponseConverter.setControllerException(controller, e);
@@ -179,7 +190,7 @@ public class AggregateImplementation<T, 
     InternalScanner scanner = null;
     long sum = 0l;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       S sumVal = null;
       T temp;
       Scan scan = ProtobufUtil.toScan(request.getScan());
@@ -203,7 +214,7 @@ public class AggregateImplementation<T, 
       } while (hasMoreRows);
       if (sumVal != null) {
         response = AggregateResponse.newBuilder().addFirstPart( 
-          ci.getProtoForPromotedType(sumVal)).build();
+          ci.getProtoForPromotedType(sumVal).toByteString()).build();
       }
     } catch (IOException e) {
       ResponseConverter.setControllerException(controller, e);
@@ -287,7 +298,7 @@ public class AggregateImplementation<T, 
     AggregateResponse response = null;
     InternalScanner scanner = null;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       S sumVal = null;
       Long rowCountVal = 0l;
       Scan scan = ProtobufUtil.toScan(request.getScan());
@@ -311,7 +322,7 @@ public class AggregateImplementation<T, 
         rowCountVal++;
       } while (hasMoreRows);
       if (sumVal != null) {
-        ByteString first = ci.getProtoForPromotedType(sumVal);
+        ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
         pair.addFirstPart(first);
         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
@@ -346,7 +357,7 @@ public class AggregateImplementation<T, 
     InternalScanner scanner = null;
     AggregateResponse response = null;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       S sumVal = null, sumSqVal = null, tempVal = null;
       long rowCountVal = 0l;
       Scan scan = ProtobufUtil.toScan(request.getScan());
@@ -374,8 +385,8 @@ public class AggregateImplementation<T, 
         rowCountVal++;
       } while (hasMoreRows);
       if (sumVal != null) {
-        ByteString first_sumVal = ci.getProtoForPromotedType(sumVal);
-        ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal);
+        ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
+        ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
         pair.addFirstPart(first_sumVal);
         pair.addFirstPart(first_sumSqVal);
@@ -410,7 +421,7 @@ public class AggregateImplementation<T, 
     AggregateResponse response = null;
     InternalScanner scanner = null;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
       Scan scan = ProtobufUtil.toScan(request.getScan());
       scanner = env.getRegion().getScanner(scan);
@@ -442,9 +453,9 @@ public class AggregateImplementation<T, 
         sumVal = ci.add(sumVal, tempVal);
         sumWeights = ci.add(sumWeights, tempWeight);
       } while (hasMoreRows);
-      ByteString first_sumVal = ci.getProtoForPromotedType(sumVal);
+      ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
       S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
-      ByteString first_sumWeights = ci.getProtoForPromotedType(s);
+      ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
       AggregateResponse.Builder pair = AggregateResponse.newBuilder();
       pair.addFirstPart(first_sumVal);
       pair.addFirstPart(first_sumWeights); 
@@ -462,15 +473,17 @@ public class AggregateImplementation<T, 
   }
 
   @SuppressWarnings("unchecked")
-  ColumnInterpreter<T,S> constructColumnInterpreterFromRequest(
+  ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
       AggregateArgument request) throws IOException {
     String className = request.getInterpreterClassName();
     Class<?> cls;
     try {
       cls = Class.forName(className);
-      ColumnInterpreter<T,S> ci = (ColumnInterpreter<T, S>) cls.newInstance();
+      ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
       if (request.hasInterpreterSpecificBytes()) {
-        ci.initialize(request.getInterpreterSpecificBytes());
+        ByteString b = request.getInterpreterSpecificBytes();
+        P initMsg = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 2, b);
+        ci.initialize(initMsg);
       }
       return ci;
     } catch (ClassNotFoundException e) {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1446147&r1=1446146&r2=1446147&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Thu Feb 14 12:58:12 2013
@@ -140,7 +140,8 @@ public abstract class BaseRegionObserver
 
   @Override
   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
-      final HStore store, final InternalScanner scanner) throws IOException {
+      final HStore store, final InternalScanner scanner, final ScanType scanType)
+          throws IOException {
     return scanner;
   }
 
@@ -315,6 +316,12 @@ public abstract class BaseRegionObserver
   }
 
   @Override
+  public boolean postScannerFilterRow(final ObserverContext<RegionCoprocessorEnvironment> e,
+      final InternalScanner s, final byte[] currentRow, final boolean hasMore) throws IOException {
+    return hasMore;
+  }
+  
+  @Override
   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> e,
       final InternalScanner s) throws IOException {
   }



Mime
View raw message