hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject svn commit: r1449950 [6/35] - in /hbase/trunk: ./ hbase-client/ hbase-client/src/ hbase-client/src/main/ hbase-client/src/main/java/ hbase-client/src/main/java/org/ hbase-client/src/main/java/org/apache/ hbase-client/src/main/java/org/apache/hadoop/ hb...
Date Mon, 25 Feb 2013 22:50:29 GMT
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
(added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
Mon Feb 25 22:50:17 2013
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
+import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
+import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+/**
+ * Implements the scanner interface for the HBase client.
+ * If there are multiple regions in a table, this scanner will iterate
+ * through them all.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ClientScanner extends AbstractClientScanner {
+    private final Log LOG = LogFactory.getLog(this.getClass());
+    private Scan scan;
+    private boolean closed = false;
+    // Current region scanner is against.  Gets cleared if current region goes
+    // wonky: e.g. if it splits on us.
+    private HRegionInfo currentRegion = null;
+    private ScannerCallable callable = null;
+    private final LinkedList<Result> cache = new LinkedList<Result>();
+    private final int caching;
+    private long lastNext;
+    // Keep lastResult returned successfully in case we have to reset scanner.
+    private Result lastResult = null;
+    private ScanMetrics scanMetrics = null;
+    private final long maxScannerResultSize;
+    private final HConnection connection;
+    private final byte[] tableName;
+    private final int scannerTimeout;
+
+    /**
+     * Create a new ClientScanner for the specified table. An HConnection will be
+     * retrieved using the passed Configuration.
+     * Note that the passed {@link Scan}'s start row maybe changed changed.
+     *
+     * @param conf The {@link Configuration} to use.
+     * @param scan {@link Scan} to use in this scanner
+     * @param tableName The table that we wish to scan
+     * @throws IOException
+     */
+    public ClientScanner(final Configuration conf, final Scan scan,
+        final byte[] tableName) throws IOException {
+      this(conf, scan, tableName, HConnectionManager.getConnection(conf));
+    }
+
+    /**
+     * Create a new ClientScanner for the specified table
+     * Note that the passed {@link Scan}'s start row maybe changed changed.
+     *
+     * @param conf The {@link Configuration} to use.
+     * @param scan {@link Scan} to use in this scanner
+     * @param tableName The table that we wish to scan
+     * @param connection Connection identifying the cluster
+     * @throws IOException
+     */
+    public ClientScanner(final Configuration conf, final Scan scan,
+      final byte[] tableName, HConnection connection) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Creating scanner over "
+            + Bytes.toString(tableName)
+            + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
+      }
+      this.scan = scan;
+      this.tableName = tableName;
+      this.lastNext = System.currentTimeMillis();
+      this.connection = connection;
+      if (scan.getMaxResultSize() > 0) {
+        this.maxScannerResultSize = scan.getMaxResultSize();
+      } else {
+        this.maxScannerResultSize = conf.getLong(
+          HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
+          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+      }
+      this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
+
+      // check if application wants to collect scan metrics
+      byte[] enableMetrics = scan.getAttribute(
+        Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
+      if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
+        scanMetrics = new ScanMetrics();
+      }
+
+      // Use the caching from the Scan.  If not set, use the default cache setting for this
table.
+      if (this.scan.getCaching() > 0) {
+        this.caching = this.scan.getCaching();
+      } else {
+        this.caching = conf.getInt(
+            HConstants.HBASE_CLIENT_SCANNER_CACHING,
+            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+      }
+
+      // initialize the scanner
+      nextScanner(this.caching, false);
+    }
+
+    protected HConnection getConnection() {
+      return this.connection;
+    }
+
+    protected byte[] getTableName() {
+      return this.tableName;
+    }
+
+    protected Scan getScan() {
+      return scan;
+    }
+
+    protected long getTimestamp() {
+      return lastNext;
+    }
+
+    // returns true if the passed region endKey
+    private boolean checkScanStopRow(final byte [] endKey) {
+      if (this.scan.getStopRow().length > 0) {
+        // there is a stop row, check to see if we are past it.
+        byte [] stopRow = scan.getStopRow();
+        int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
+          endKey, 0, endKey.length);
+        if (cmp <= 0) {
+          // stopRow <= endKey (endKey is equals to or larger than stopRow)
+          // This is a stop.
+          return true;
+        }
+      }
+      return false; //unlikely.
+    }
+
+    /*
+     * Gets a scanner for the next region.  If this.currentRegion != null, then
+     * we will move to the endrow of this.currentRegion.  Else we will get
+     * scanner at the scan.getStartRow().  We will go no further, just tidy
+     * up outstanding scanners, if <code>currentRegion != null</code> and
+     * <code>done</code> is true.
+     * @param nbRows
+     * @param done Server-side says we're done scanning.
+     */
+    private boolean nextScanner(int nbRows, final boolean done)
+    throws IOException {
+      // Close the previous scanner if it's open
+      if (this.callable != null) {
+        this.callable.setClose();
+        callable.withRetries();
+        this.callable = null;
+      }
+
+      // Where to start the next scanner
+      byte [] localStartKey;
+
+      // if we're at end of table, close and return false to stop iterating
+      if (this.currentRegion != null) {
+        byte [] endKey = this.currentRegion.getEndKey();
+        if (endKey == null ||
+            Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
+            checkScanStopRow(endKey) ||
+            done) {
+          close();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Finished with scanning at " + this.currentRegion);
+          }
+          return false;
+        }
+        localStartKey = endKey;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Finished with region " + this.currentRegion);
+        }
+      } else {
+        localStartKey = this.scan.getStartRow();
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Advancing internal scanner to startKey at '" +
+          Bytes.toStringBinary(localStartKey) + "'");
+      }
+      try {
+        callable = getScannerCallable(localStartKey, nbRows);
+        // Open a scanner on the region server starting at the
+        // beginning of the region
+        callable.withRetries();
+        this.currentRegion = callable.getHRegionInfo();
+        if (this.scanMetrics != null) {
+          this.scanMetrics.countOfRegions.incrementAndGet();
+        }
+      } catch (IOException e) {
+        close();
+        throw e;
+      }
+      return true;
+    }
+
+    protected ScannerCallable getScannerCallable(byte [] localStartKey,
+        int nbRows) {
+      scan.setStartRow(localStartKey);
+      ScannerCallable s = new ScannerCallable(getConnection(),
+        getTableName(), scan, this.scanMetrics);
+      s.setCaching(nbRows);
+      return s;
+    }
+
+    /**
+     * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back
to the
+     * application or TableInputFormat.Later, we could push it to other systems. We don't
use metrics
+     * framework because it doesn't support multi-instances of the same metrics on the same
machine;
+     * for scan/map reduce scenarios, we will have multiple scans running at the same time.
+     *
+     * By default, scan metrics are disabled; if the application wants to collect them, this
behavior
+     * can be turned on by calling calling:
+     *
+     * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
+     */
+    private void writeScanMetrics() throws IOException {
+      if (this.scanMetrics == null) {
+        return;
+      }
+      MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
+      scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
+    }
+
+    public Result next() throws IOException {
+      // If the scanner is closed and there's nothing left in the cache, next is a no-op.
+      if (cache.size() == 0 && this.closed) {
+        return null;
+      }
+      if (cache.size() == 0) {
+        Result [] values = null;
+        long remainingResultSize = maxScannerResultSize;
+        int countdown = this.caching;
+        // We need to reset it if it's a new callable that was created
+        // with a countdown in nextScanner
+        callable.setCaching(this.caching);
+        // This flag is set when we want to skip the result returned.  We do
+        // this when we reset scanner because it split under us.
+        boolean skipFirst = false;
+        boolean retryAfterOutOfOrderException  = true;
+        do {
+          try {
+            if (skipFirst) {
+              // Skip only the first row (which was the last row of the last
+              // already-processed batch).
+              callable.setCaching(1);
+              values = callable.withRetries();
+              callable.setCaching(this.caching);
+              skipFirst = false;
+            }
+            // Server returns a null values if scanning is to stop.  Else,
+            // returns an empty array if scanning is to go on and we've just
+            // exhausted current region.
+            values = callable.withRetries();
+            retryAfterOutOfOrderException  = true;
+          } catch (DoNotRetryIOException e) {
+            if (e instanceof UnknownScannerException) {
+              long timeout = lastNext + scannerTimeout;
+              // If we are over the timeout, throw this exception to the client
+              // Else, it's because the region moved and we used the old id
+              // against the new region server; reset the scanner.
+              if (timeout < System.currentTimeMillis()) {
+                long elapsed = System.currentTimeMillis() - lastNext;
+                ScannerTimeoutException ex = new ScannerTimeoutException(
+                    elapsed + "ms passed since the last invocation, " +
+                        "timeout is currently set to " + scannerTimeout);
+                ex.initCause(e);
+                throw ex;
+              }
+            } else {
+              Throwable cause = e.getCause();
+              if ((cause == null || (!(cause instanceof NotServingRegionException)
+                  && !(cause instanceof RegionServerStoppedException)))
+                  && !(e instanceof OutOfOrderScannerNextException)) {
+                throw e;
+              }
+            }
+            // Else, its signal from depths of ScannerCallable that we got an
+            // NSRE on a next and that we need to reset the scanner.
+            if (this.lastResult != null) {
+              this.scan.setStartRow(this.lastResult.getRow());
+              // Skip first row returned.  We already let it out on previous
+              // invocation.
+              skipFirst = true;
+            }
+            if (e instanceof OutOfOrderScannerNextException) {
+              if (retryAfterOutOfOrderException) {
+                retryAfterOutOfOrderException = false;
+              } else {
+                throw new DoNotRetryIOException("Failed after retry"
+                    + ", it could be cause by rpc timeout", e);
+              }
+            }
+            // Clear region
+            this.currentRegion = null;
+            callable = null;
+            continue;
+          }
+          long currentTime = System.currentTimeMillis();
+          if (this.scanMetrics != null ) {
+            this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
+          }
+          lastNext = currentTime;
+          if (values != null && values.length > 0) {
+            for (Result rs : values) {
+              cache.add(rs);
+              for (KeyValue kv : rs.raw()) {
+                  remainingResultSize -= kv.heapSize();
+              }
+              countdown--;
+              this.lastResult = rs;
+            }
+          }
+          // Values == null means server-side filter has determined we must STOP
+        } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown,
values == null));
+      }
+
+      if (cache.size() > 0) {
+        return cache.poll();
+      }
+
+      // if we exhausted this scanner before calling close, write out the scan metrics
+      writeScanMetrics();
+      return null;
+    }
+
+    /**
+     * Get <param>nbRows</param> rows.
+     * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
+     * setting (or hbase.client.scanner.caching in hbase-site.xml).
+     * @param nbRows number of rows to return
+     * @return Between zero and <param>nbRows</param> RowResults.  Scan is done
+     * if returned array is of zero-length (We never return null).
+     * @throws IOException
+     */
+    public Result [] next(int nbRows) throws IOException {
+      // Collect values to be returned here
+      ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
+      for(int i = 0; i < nbRows; i++) {
+        Result next = next();
+        if (next != null) {
+          resultSets.add(next);
+        } else {
+          break;
+        }
+      }
+      return resultSets.toArray(new Result[resultSets.size()]);
+    }
+
+    public void close() {
+      if (callable != null) {
+        callable.setClose();
+        try {
+          callable.withRetries();
+        } catch (IOException e) {
+          // We used to catch this error, interpret, and rethrow. However, we
+          // have since decided that it's not nice for a scanner's close to
+          // throw exceptions. Chances are it was just an UnknownScanner
+          // exception due to lease time out.
+        } finally {
+          // we want to output the scan metrics even if an error occurred on close
+          try {
+            writeScanMetrics();
+          } catch (IOException e) {
+            // As above, we still don't want the scanner close() method to throw.
+          }
+        }
+        callable = null;
+      }
+      closed = true;
+    }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
(added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
Mon Feb 25 22:50:17 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.util.Random;
+
+/**
+ * Utility used by client connections such as {@link HConnection} and
+ * {@link ServerCallable}
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ConnectionUtils {
+
+  private static final Random RANDOM = new Random();
+  /**
+   * Calculate pause time.
+   * Built on {@link HConstants#RETRY_BACKOFF}.
+   * @param pause
+   * @param tries
+   * @return How long to wait after <code>tries</code> retries
+   */
+  public static long getPauseTime(final long pause, final int tries) {
+    int ntries = tries;
+    if (ntries >= HConstants.RETRY_BACKOFF.length) {
+      ntries = HConstants.RETRY_BACKOFF.length - 1;
+    }
+
+    long normalPause = pause * HConstants.RETRY_BACKOFF[ntries];
+    long jitter =  (long)(normalPause * RANDOM.nextFloat() * 0.01f); // 1% possible jitter
+    return normalPause + jitter;
+  }
+
+
+  /**
+   * Adds / subs a 10% jitter to a pause time. Minimum is 1.
+   * @param pause the expected pause.
+   * @param jitter the jitter ratio, between 0 and 1, exclusive.
+   */
+  public static long addJitter(final long pause, final float jitter) {
+    float lag = pause * (RANDOM.nextFloat() - 0.5f) * jitter;
+    long newPause = pause + (long) lag;
+    if (newPause <= 0) {
+      return 1;
+    }
+    return newPause;
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java Mon
Feb 25 22:50:17 2013
@@ -0,0 +1,250 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used to perform Delete operations on a single row.
+ * <p>
+ * To delete an entire row, instantiate a Delete object with the row
+ * to delete.  To further define the scope of what to delete, perform
+ * additional methods as outlined below.
+ * <p>
+ * To delete specific families, execute {@link #deleteFamily(byte[]) deleteFamily}
+ * for each family to delete.
+ * <p>
+ * To delete multiple versions of specific columns, execute
+ * {@link #deleteColumns(byte[], byte[]) deleteColumns}
+ * for each column to delete.
+ * <p>
+ * To delete specific versions of specific columns, execute
+ * {@link #deleteColumn(byte[], byte[], long) deleteColumn}
+ * for each column version to delete.
+ * <p>
+ * Specifying timestamps, deleteFamily and deleteColumns will delete all
+ * versions with a timestamp less than or equal to that passed.  If no
+ * timestamp is specified, an entry is added with a timestamp of 'now'
+ * where 'now' is the servers's System.currentTimeMillis().
+ * Specifying a timestamp to the deleteColumn method will
+ * delete versions only with a timestamp equal to that specified.
+ * If no timestamp is passed to deleteColumn, internally, it figures the
+ * most recent cell's timestamp and adds a delete at that timestamp; i.e.
+ * it deletes the most recently added cell.
+ * <p>The timestamp passed to the constructor is used ONLY for delete of
+ * rows.  For anything less -- a deleteColumn, deleteColumns or
+ * deleteFamily -- then you need to use the method overrides that take a
+ * timestamp.  The constructor timestamp is not referenced.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class Delete extends Mutation implements Comparable<Row> {
+  /**
+   * Create a Delete operation for the specified row.
+   * <p>
+   * If no further operations are done, this will delete everything
+   * associated with the specified row (all versions of all columns in all
+   * families).
+   * @param row row key
+   */
+  public Delete(byte [] row) {
+    this(row, HConstants.LATEST_TIMESTAMP);
+  }
+
+  /**
+   * Create a Delete operation for the specified row and timestamp.<p>
+   *
+   * If no further operations are done, this will delete all columns in all
+   * families of the specified row with a timestamp less than or equal to the
+   * specified timestamp.<p>
+   *
+   * This timestamp is ONLY used for a delete row operation.  If specifying
+   * families or columns, you must specify each timestamp individually.
+   * @param row row key
+   * @param timestamp maximum version timestamp (only for delete row)
+   */
+  public Delete(byte [] row, long timestamp) {
+    this.row = row;
+    this.ts = timestamp;
+  }
+
+  /**
+   * @param d Delete to clone.
+   */
+  public Delete(final Delete d) {
+    this.row = d.getRow();
+    this.ts = d.getTimeStamp();
+    this.familyMap.putAll(d.getFamilyMap());
+    this.writeToWAL = d.writeToWAL;
+  }
+
+  /**
+   * Advanced use only.
+   * Add an existing delete marker to this Delete object.
+   * @param kv An existing KeyValue of type "delete".
+   * @return this for invocation chaining
+   * @throws IOException
+   */
+  public Delete addDeleteMarker(KeyValue kv) throws IOException {
+    if (!kv.isDelete()) {
+      throw new IOException("The recently added KeyValue is not of type "
+          + "delete. Rowkey: " + Bytes.toStringBinary(this.row));
+    }
+    if (Bytes.compareTo(this.row, 0, row.length, kv.getBuffer(),
+        kv.getRowOffset(), kv.getRowLength()) != 0) {
+      throw new IOException("The row in the recently added KeyValue "
+          + Bytes.toStringBinary(kv.getBuffer(), kv.getRowOffset(),
+              kv.getRowLength()) + " doesn't match the original one "
+          + Bytes.toStringBinary(this.row));
+    }
+    byte [] family = kv.getFamily();
+    List<KeyValue> list = familyMap.get(family);
+    if (list == null) {
+      list = new ArrayList<KeyValue>();
+    }
+    list.add(kv);
+    familyMap.put(family, list);
+    return this;
+  }
+
+  /**
+   * Delete all versions of all columns of the specified family.
+   * <p>
+   * Overrides previous calls to deleteColumn and deleteColumns for the
+   * specified family.
+   * @param family family name
+   * @return this for invocation chaining
+   */
+  public Delete deleteFamily(byte [] family) {
+    this.deleteFamily(family, HConstants.LATEST_TIMESTAMP);
+    return this;
+  }
+
+  /**
+   * Delete all columns of the specified family with a timestamp less than
+   * or equal to the specified timestamp.
+   * <p>
+   * Overrides previous calls to deleteColumn and deleteColumns for the
+   * specified family.
+   * @param family family name
+   * @param timestamp maximum version timestamp
+   * @return this for invocation chaining
+   */
+  public Delete deleteFamily(byte [] family, long timestamp) {
+    List<KeyValue> list = familyMap.get(family);
+    if(list == null) {
+      list = new ArrayList<KeyValue>();
+    } else if(!list.isEmpty()) {
+      list.clear();
+    }
+    list.add(new KeyValue(row, family, null, timestamp, KeyValue.Type.DeleteFamily));
+    familyMap.put(family, list);
+    return this;
+  }
+
+  /**
+   * Delete all versions of the specified column.
+   * @param family family name
+   * @param qualifier column qualifier
+   * @return this for invocation chaining
+   */
+  public Delete deleteColumns(byte [] family, byte [] qualifier) {
+    this.deleteColumns(family, qualifier, HConstants.LATEST_TIMESTAMP);
+    return this;
+  }
+
+  /**
+   * Delete all versions of the specified column with a timestamp less than
+   * or equal to the specified timestamp.
+   * @param family family name
+   * @param qualifier column qualifier
+   * @param timestamp maximum version timestamp
+   * @return this for invocation chaining
+   */
+  public Delete deleteColumns(byte [] family, byte [] qualifier, long timestamp) {
+    List<KeyValue> list = familyMap.get(family);
+    if (list == null) {
+      list = new ArrayList<KeyValue>();
+    }
+    list.add(new KeyValue(this.row, family, qualifier, timestamp,
+      KeyValue.Type.DeleteColumn));
+    familyMap.put(family, list);
+    return this;
+  }
+
+  /**
+   * Delete the latest version of the specified column.
+   * This is an expensive call in that on the server-side, it first does a
+   * get to find the latest versions timestamp.  Then it adds a delete using
+   * the fetched cells timestamp.
+   * @param family family name
+   * @param qualifier column qualifier
+   * @return this for invocation chaining
+   */
+  public Delete deleteColumn(byte [] family, byte [] qualifier) {
+    this.deleteColumn(family, qualifier, HConstants.LATEST_TIMESTAMP);
+    return this;
+  }
+
+  /**
+   * Delete the specified version of the specified column.
+   * @param family family name
+   * @param qualifier column qualifier
+   * @param timestamp version timestamp
+   * @return this for invocation chaining
+   */
+  public Delete deleteColumn(byte [] family, byte [] qualifier, long timestamp) {
+    List<KeyValue> list = familyMap.get(family);
+    if(list == null) {
+      list = new ArrayList<KeyValue>();
+    }
+    list.add(new KeyValue(
+        this.row, family, qualifier, timestamp, KeyValue.Type.Delete));
+    familyMap.put(family, list);
+    return this;
+  }
+
+  /**
+   * Set the timestamp of the delete.
+   * 
+   * @param timestamp
+   */
+  public void setTimestamp(long timestamp) {
+    this.ts = timestamp;
+  }
+
+  @Override
+  public Map<String, Object> toMap(int maxCols) {
+    // we start with the fingerprint map and build on top of it.
+    Map<String, Object> map = super.toMap(maxCols);
+    // why is put not doing this?
+    map.put("ts", this.ts);
+    return map;
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java Mon Feb
25 22:50:17 2013
@@ -0,0 +1,406 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Used to perform Get operations on a single row.
+ * <p>
+ * To get everything for a row, instantiate a Get object with the row to get.
+ * To further define the scope of what to get, perform additional methods as
+ * outlined below.
+ * <p>
+ * To get all columns from specific families, execute {@link #addFamily(byte[]) addFamily}
+ * for each family to retrieve.
+ * <p>
+ * To get specific columns, execute {@link #addColumn(byte[], byte[]) addColumn}
+ * for each column to retrieve.
+ * <p>
+ * To only retrieve columns within a specific range of version timestamps,
+ * execute {@link #setTimeRange(long, long) setTimeRange}.
+ * <p>
+ * To only retrieve columns with a specific timestamp, execute
+ * {@link #setTimeStamp(long) setTimestamp}.
+ * <p>
+ * To limit the number of versions of each column to be returned, execute
+ * {@link #setMaxVersions(int) setMaxVersions}.
+ * <p>
+ * To add a filter, execute {@link #setFilter(Filter) setFilter}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class Get extends OperationWithAttributes
+  implements Row, Comparable<Row> {
+
+  private byte [] row = null;
+  private int maxVersions = 1;
+  private boolean cacheBlocks = true;
+  private int storeLimit = -1;
+  private int storeOffset = 0;
+  private Filter filter = null;
+  private TimeRange tr = new TimeRange();
+  private Map<byte [], NavigableSet<byte []>> familyMap =
+    new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
+
+  /**
+   * Create a Get operation for the specified row.
+   * <p>
+   * If no further operations are done, this will get the latest version of
+   * all columns in all families of the specified row.
+   * @param row row key
+   */
+  public Get(byte [] row) {
+    this.row = row;
+  }
+
+  /**
+   * Get all columns from the specified family.
+   * <p>
+   * Overrides previous calls to addColumn for this family.
+   * @param family family name
+   * @return the Get object
+   */
+  public Get addFamily(byte [] family) {
+    familyMap.remove(family);
+    familyMap.put(family, null);
+    return this;
+  }
+
+  /**
+   * Get the column from the specific family with the specified qualifier.
+   * <p>
+   * Overrides previous calls to addFamily for this family.
+   * @param family family name
+   * @param qualifier column qualifier
+   * @return the Get objec
+   */
+  public Get addColumn(byte [] family, byte [] qualifier) {
+    NavigableSet<byte []> set = familyMap.get(family);
+    if(set == null) {
+      set = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+    }
+    if (qualifier == null) {
+      qualifier = HConstants.EMPTY_BYTE_ARRAY;
+    }
+    set.add(qualifier);
+    familyMap.put(family, set);
+    return this;
+  }
+
+  /**
+   * Get versions of columns only within the specified timestamp range,
+   * [minStamp, maxStamp).
+   * @param minStamp minimum timestamp value, inclusive
+   * @param maxStamp maximum timestamp value, exclusive
+   * @throws IOException if invalid time range
+   * @return this for invocation chaining
+   */
+  public Get setTimeRange(long minStamp, long maxStamp)
+  throws IOException {
+    tr = new TimeRange(minStamp, maxStamp);
+    return this;
+  }
+
+  /**
+   * Get versions of columns with the specified timestamp.
+   * @param timestamp version timestamp
+   * @return this for invocation chaining
+   */
+  public Get setTimeStamp(long timestamp) {
+    try {
+      tr = new TimeRange(timestamp, timestamp+1);
+    } catch(IOException e) {
+      // Will never happen
+    }
+    return this;
+  }
+
+  /**
+   * Get all available versions.
+   * @return this for invocation chaining
+   */
+  public Get setMaxVersions() {
+    this.maxVersions = Integer.MAX_VALUE;
+    return this;
+  }
+
+  /**
+   * Get up to the specified number of versions of each column.
+   * @param maxVersions maximum versions for each column
+   * @throws IOException if invalid number of versions
+   * @return this for invocation chaining
+   */
+  public Get setMaxVersions(int maxVersions) throws IOException {
+    if(maxVersions <= 0) {
+      throw new IOException("maxVersions must be positive");
+    }
+    this.maxVersions = maxVersions;
+    return this;
+  }
+
+  /**
+   * Set the maximum number of values to return per row per Column Family
+   * @param limit the maximum number of values returned / row / CF
+   * @return this for invocation chaining
+   */
+  public Get setMaxResultsPerColumnFamily(int limit) {
+    this.storeLimit = limit;
+    return this;
+  }
+
+  /**
+   * Set offset for the row per Column Family. This offset is only within a particular row/CF
+   * combination. It gets reset back to zero when we move to the next row or CF.
+   * @param offset is the number of kvs that will be skipped.
+   * @return this for invocation chaining
+   */
+  public Get setRowOffsetPerColumnFamily(int offset) {
+    this.storeOffset = offset;
+    return this;
+  }
+
+  /**
+   * Apply the specified server-side filter when performing the Get.
+   * Only {@link Filter#filterKeyValue(KeyValue)} is called AFTER all tests
+   * for ttl, column match, deletes and max versions have been run.
+   * @param filter filter to run on the server
+   * @return this for invocation chaining
+   */
+  public Get setFilter(Filter filter) {
+    this.filter = filter;
+    return this;
+  }
+
+  /* Accessors */
+
+  /**
+   * @return Filter
+   */
+  public Filter getFilter() {
+    return this.filter;
+  }
+
+  /**
+   * Set whether blocks should be cached for this Get.
+   * <p>
+   * This is true by default.  When true, default settings of the table and
+   * family are used (this will never override caching blocks if the block
+   * cache is disabled for that family or entirely).
+   *
+   * @param cacheBlocks if false, default settings are overridden and blocks
+   * will not be cached
+   */
+  public void setCacheBlocks(boolean cacheBlocks) {
+    this.cacheBlocks = cacheBlocks;
+  }
+
+  /**
+   * Get whether blocks should be cached for this Get.
+   * @return true if default caching should be used, false if blocks should not
+   * be cached
+   */
+  public boolean getCacheBlocks() {
+    return cacheBlocks;
+  }
+
+  /**
+   * Method for retrieving the get's row
+   * @return row
+   */
+  public byte [] getRow() {
+    return this.row;
+  }
+
+  /**
+   * Method for retrieving the get's maximum number of version
+   * @return the maximum number of version to fetch for this get
+   */
+  public int getMaxVersions() {
+    return this.maxVersions;
+  }
+
+  /**
+   * Method for retrieving the get's maximum number of values
+   * to return per Column Family
+   * @return the maximum number of values to fetch per CF
+   */
+  public int getMaxResultsPerColumnFamily() {
+    return this.storeLimit;
+  }
+
+  /**
+   * Method for retrieving the get's offset per row per column
+   * family (#kvs to be skipped)
+   * @return the row offset
+   */
+  public int getRowOffsetPerColumnFamily() {
+    return this.storeOffset;
+  }
+
+  /**
+   * Method for retrieving the get's TimeRange
+   * @return timeRange
+   */
+  public TimeRange getTimeRange() {
+    return this.tr;
+  }
+
+  /**
+   * Method for retrieving the keys in the familyMap
+   * @return keys in the current familyMap
+   */
+  public Set<byte[]> familySet() {
+    return this.familyMap.keySet();
+  }
+
+  /**
+   * Method for retrieving the number of families to get from
+   * @return number of families
+   */
+  public int numFamilies() {
+    return this.familyMap.size();
+  }
+
+  /**
+   * Method for checking if any families have been inserted into this Get
+   * @return true if familyMap is non empty false otherwise
+   */
+  public boolean hasFamilies() {
+    return !this.familyMap.isEmpty();
+  }
+
+  /**
+   * Method for retrieving the get's familyMap
+   * @return familyMap
+   */
+  public Map<byte[],NavigableSet<byte[]>> getFamilyMap() {
+    return this.familyMap;
+  }
+
+  /**
+   * Compile the table and column family (i.e. schema) information
+   * into a String. Useful for parsing and aggregation by debugging,
+   * logging, and administration tools.
+   * @return Map
+   */
+  @Override
+  public Map<String, Object> getFingerprint() {
+    Map<String, Object> map = new HashMap<String, Object>();
+    List<String> families = new ArrayList<String>();
+    map.put("families", families);
+    for (Map.Entry<byte [], NavigableSet<byte[]>> entry :
+      this.familyMap.entrySet()) {
+      families.add(Bytes.toStringBinary(entry.getKey()));
+    }
+    return map;
+  }
+
+  /**
+   * Compile the details beyond the scope of getFingerprint (row, columns,
+   * timestamps, etc.) into a Map along with the fingerprinted information.
+   * Useful for debugging, logging, and administration tools.
+   * @param maxCols a limit on the number of columns output prior to truncation
+   * @return Map
+   */
+  @Override
+  public Map<String, Object> toMap(int maxCols) {
+    // we start with the fingerprint map and build on top of it.
+    Map<String, Object> map = getFingerprint();
+    // replace the fingerprint's simple list of families with a 
+    // map from column families to lists of qualifiers and kv details
+    Map<String, List<String>> columns = new HashMap<String, List<String>>();
+    map.put("families", columns);
+    // add scalar information first
+    map.put("row", Bytes.toStringBinary(this.row));
+    map.put("maxVersions", this.maxVersions);
+    map.put("cacheBlocks", this.cacheBlocks);
+    List<Long> timeRange = new ArrayList<Long>();
+    timeRange.add(this.tr.getMin());
+    timeRange.add(this.tr.getMax());
+    map.put("timeRange", timeRange);
+    int colCount = 0;
+    // iterate through affected families and add details
+    for (Map.Entry<byte [], NavigableSet<byte[]>> entry :
+      this.familyMap.entrySet()) {
+      List<String> familyList = new ArrayList<String>();
+      columns.put(Bytes.toStringBinary(entry.getKey()), familyList);
+      if(entry.getValue() == null) {
+        colCount++;
+        --maxCols;
+        familyList.add("ALL");
+      } else {
+        colCount += entry.getValue().size();
+        if (maxCols <= 0) {
+          continue;
+        }
+        for (byte [] column : entry.getValue()) {
+          if (--maxCols <= 0) {
+            continue;
+          }
+          familyList.add(Bytes.toStringBinary(column));
+        }
+      }   
+    }   
+    map.put("totalColumns", colCount);
+    if (this.filter != null) {
+      map.put("filter", this.filter.toString());
+    }
+    // add the id if set
+    if (getId() != null) {
+      map.put("id", getId());
+    }
+    return map;
+  }
+
+  //Row
+  @Override
+  public int compareTo(Row other) {
+    return Bytes.compareTo(this.getRow(), other.getRow());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    Row other = (Row) obj;
+    return compareTo(other) == 0;
+  }
+}



Mime
View raw message