Return-Path: Delivered-To: apmail-hadoop-hbase-commits-archive@minotaur.apache.org Received: (qmail 2083 invoked from network); 6 Jun 2009 01:26:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 6 Jun 2009 01:26:47 -0000 Received: (qmail 65354 invoked by uid 500); 6 Jun 2009 01:26:59 -0000 Delivered-To: apmail-hadoop-hbase-commits-archive@hadoop.apache.org Received: (qmail 65324 invoked by uid 500); 6 Jun 2009 01:26:59 -0000 Mailing-List: contact hbase-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hbase-dev@hadoop.apache.org Delivered-To: mailing list hbase-commits@hadoop.apache.org Received: (qmail 65315 invoked by uid 99); 6 Jun 2009 01:26:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Jun 2009 01:26:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Jun 2009 01:26:55 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E86E323888C8; Sat, 6 Jun 2009 01:26:34 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r782178 [3/16] - in /hadoop/hbase/trunk: bin/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/client/tableindexed/ src/java/org/apache/hadoop/hbase/client/transactional/ src/java/o... Date: Sat, 06 Jun 2009 01:26:27 -0000 To: hbase-commits@hadoop.apache.org From: rawson@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090606012634.E86E323888C8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=782178&r1=782177&r2=782178&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sat Jun 6 01:26:21 2009 @@ -1,5 +1,5 @@ /** - * Copyright 2007 The Apache Software Foundation + * Copyright 2009 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -38,15 +38,12 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; -import org.apache.hadoop.hbase.io.BatchUpdate; -import org.apache.hadoop.hbase.io.Cell; -import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; import org.apache.hadoop.hbase.ipc.HMasterInterface; @@ -338,9 +335,9 @@ MetaScannerVisitor visitor = new MetaScannerVisitor() { - public boolean processRow(RowResult rowResult) throws IOException { + public boolean processRow(Result result) throws IOException { HRegionInfo info = Writables.getHRegionInfo( - rowResult.get(COL_REGIONINFO)); + result.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER)); // Only examine the rows where the startKey is zero length if (info != null && info.getStartKey().length == 0) { @@ -387,12 +384,13 @@ HRegionInfo.createRegionName(tableName, null, HConstants.ZEROES); byte[] endKey = null; HRegionInfo currentRegion = null; + Scan scan = new Scan(startKey); + scan.addColumn(CATALOG_FAMILY, REGIONINFO_QUALIFIER); ScannerCallable s = new ScannerCallable(this, (Bytes.equals(tableName, HConstants.META_TABLE_NAME) ? HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME), - HConstants.COL_REGIONINFO_ARRAY, startKey, - HConstants.LATEST_TIMESTAMP, null - ); + scan.getStartRow(), + scan); try { // Open scanner getRegionServerWithRetries(s); @@ -402,27 +400,25 @@ startKey = oldRegion.getEndKey(); } currentRegion = s.getHRegionInfo(); - RowResult r = null; - RowResult[] rrs = null; + Result r = null; + Result [] rrs = null; while ((rrs = getRegionServerWithRetries(s)) != null) { r = rrs[0]; - Cell c = r.get(HConstants.COL_REGIONINFO); - if (c != null) { - byte[] value = c.getValue(); - if (value != null) { - HRegionInfo info = Writables.getHRegionInfoOrNull(value); - if (info != null) { - if (Bytes.equals(info.getTableDesc().getName(), tableName)) { - rowsScanned += 1; - rowsOffline += info.isOffline() ? 1 : 0; - } + byte [] value = r.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER); + if (value != null) { + HRegionInfo info = Writables.getHRegionInfoOrNull(value); + if (info != null) { + if (Bytes.equals(info.getTableDesc().getName(), tableName)) { + rowsScanned += 1; + rowsOffline += info.isOffline() ? 1 : 0; } } } } endKey = currentRegion.getEndKey(); - } while (!(endKey == null || HStoreKey.equalsTwoRowKeys(endKey, - HConstants.EMPTY_BYTE_ARRAY))); + } while (!(endKey == null || + Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY))); } finally { s.setClose(); @@ -440,9 +436,9 @@ protected HTableDescriptorFinder(byte[] tableName) { this.tableName = tableName; } - public boolean processRow(RowResult rowResult) throws IOException { + public boolean processRow(Result rowResult) throws IOException { HRegionInfo info = Writables.getHRegionInfo( - rowResult.get(HConstants.COL_REGIONINFO)); + rowResult.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER)); HTableDescriptor desc = info.getTableDesc(); if (Bytes.compareTo(desc.getName(), tableName) == 0) { result = desc; @@ -554,21 +550,22 @@ getHRegionConnection(metaLocation.getServerAddress()); // Query the root or meta region for the location of the meta region - RowResult regionInfoRow = server.getClosestRowBefore( + Result regionInfoRow = server.getClosestRowBefore( metaLocation.getRegionInfo().getRegionName(), metaKey, - HConstants.COLUMN_FAMILY); + HConstants.CATALOG_FAMILY); if (regionInfoRow == null) { throw new TableNotFoundException(Bytes.toString(tableName)); } - Cell value = regionInfoRow.get(COL_REGIONINFO); - if (value == null || value.getValue().length == 0) { + byte [] value = regionInfoRow.getValue(CATALOG_FAMILY, + REGIONINFO_QUALIFIER); + if (value == null || value.length == 0) { throw new IOException("HRegionInfo was null or empty in " + Bytes.toString(parentTable)); } // convert the row result into the HRegionLocation we need! HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable( - value.getValue(), new HRegionInfo()); + value, new HRegionInfo()); // possible we got a region of a different table... if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) { throw new TableNotFoundException( @@ -579,8 +576,11 @@ regionInfo.getRegionNameAsString()); } - String serverAddress = - Writables.cellToString(regionInfoRow.get(COL_SERVER)); + value = regionInfoRow.getValue(CATALOG_FAMILY, SERVER_QUALIFIER); + String serverAddress = ""; + if(value != null) { + serverAddress = Bytes.toString(value); + } if (serverAddress.equals("")) { throw new NoServerForRegionException("No server address listed " + "in " + Bytes.toString(parentTable) + " for region " + @@ -680,8 +680,8 @@ // this one. the exception case is when the endkey is EMPTY_START_ROW, // signifying that the region we're checking is actually the last // region in the table. - if (HStoreKey.equalsTwoRowKeys(endKey, HConstants.EMPTY_END_ROW) || - HStoreKey.getComparator(tableName).compareRows(endKey, row) > 0) { + if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || + KeyValue.getRowComparator(tableName).compare(endKey, row) > 0) { return possibleRegion; } } @@ -718,7 +718,7 @@ // by nature of the map, we know that the start key has to be < // otherwise it wouldn't be in the headMap. - if (HStoreKey.getComparator(tableName).compareRows(endKey, row) <= 0) { + if (KeyValue.getRowComparator(tableName).compare(endKey, row) <= 0) { // delete any matching entry HRegionLocation rl = tableLocations.remove(matchingRegions.lastKey()); @@ -978,15 +978,15 @@ return location; } - public void processBatchOfRows(ArrayList list, byte[] tableName) + public void processBatchOfRows(ArrayList list, byte[] tableName) throws IOException { if (list.isEmpty()) { return; } boolean retryOnlyOne = false; int tries = 0; - Collections.sort(list); - List tempUpdates = new ArrayList(); + Collections.sort(list); + List currentPuts = new ArrayList(); HRegionLocation location = getRegionLocationForRowWithRetries(tableName, list.get(0).getRow(), false); @@ -994,8 +994,8 @@ byte [] region = currentRegion; boolean isLastRow = false; for (int i = 0; i < list.size() && tries < numRetries; i++) { - BatchUpdate batchUpdate = list.get(i); - tempUpdates.add(batchUpdate); + Put put = list.get(i); + currentPuts.add(put); isLastRow = (i + 1) == list.size(); if (!isLastRow) { location = getRegionLocationForRowWithRetries(tableName, @@ -1003,19 +1003,19 @@ region = location.getRegionInfo().getRegionName(); } if (!Bytes.equals(currentRegion, region) || isLastRow || retryOnlyOne) { - final BatchUpdate[] updates = tempUpdates.toArray(new BatchUpdate[0]); + final Put [] puts = currentPuts.toArray(new Put[0]); int index = getRegionServerWithRetries(new ServerCallable( - this, tableName, batchUpdate.getRow()) { + this, tableName, put.getRow()) { public Integer call() throws IOException { - int i = server.batchUpdates(location.getRegionInfo() - .getRegionName(), updates); + int i = server.put(location.getRegionInfo() + .getRegionName(), puts); return i; } }); if (index != -1) { if (tries == numRetries - 1) { throw new RetriesExhaustedException("Some server", - currentRegion, batchUpdate.getRow(), + currentRegion, put.getRow(), tries, new ArrayList()); } long sleepTime = getPauseTime(tries); @@ -1031,7 +1031,7 @@ } catch (InterruptedException e) { // continue } - i = i - updates.length + index; + i = i - puts.length + index; retryOnlyOne = true; location = getRegionLocationForRowWithRetries(tableName, list.get(i + 1).getRow(), true); @@ -1041,7 +1041,7 @@ retryOnlyOne = false; } currentRegion = region; - tempUpdates.clear(); + currentPuts.clear(); } } } Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=782178&r1=782177&r2=782178&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Sat Jun 6 01:26:21 2009 @@ -1,5 +1,5 @@ /** - * Copyright 2008 The Apache Software Foundation + * Copyright 2009 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -34,31 +34,40 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.filter.StopRowFilter; import org.apache.hadoop.hbase.filter.WhileMatchRowFilter; import org.apache.hadoop.hbase.io.BatchOperation; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; -import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; + /** * Used to communicate with a single HBase table + * TODO: checkAndSave in oldAPI + * TODO: Converting filters + * TODO: Regex deletes. */ public class HTable { private final HConnection connection; private final byte [] tableName; protected final int scannerTimeout; private volatile HBaseConfiguration configuration; - private ArrayList writeBuffer; + private ArrayList writeBuffer; private long writeBufferSize; private boolean autoFlush; private long currentWriteBufferSize; @@ -113,7 +122,7 @@ conf.getInt("hbase.regionserver.lease.period", 60 * 1000); this.configuration = conf; this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); - this.writeBuffer = new ArrayList(); + this.writeBuffer = new ArrayList(); this.writeBufferSize = this.configuration.getLong("hbase.client.write.buffer", 2097152); this.autoFlush = true; @@ -242,7 +251,8 @@ } /** - * Gets the starting and ending row keys for every region in the currently open table + * Gets the starting and ending row keys for every region in the currently + * open table * * @return Pair of arrays of region starting and ending row keys * @throws IOException @@ -252,9 +262,10 @@ final List startKeyList = new ArrayList(); final List endKeyList = new ArrayList(); MetaScannerVisitor visitor = new MetaScannerVisitor() { - public boolean processRow(RowResult rowResult) throws IOException { + public boolean processRow(Result rowResult) throws IOException { HRegionInfo info = Writables.getHRegionInfo( - rowResult.get(HConstants.COL_REGIONINFO)); + rowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER)); if (Bytes.equals(info.getTableDesc().getName(), getTableName())) { if (!(info.isOffline() || info.isSplit())) { startKeyList.add(info.getStartKey()); @@ -280,18 +291,20 @@ new TreeMap(); MetaScannerVisitor visitor = new MetaScannerVisitor() { - public boolean processRow(RowResult rowResult) throws IOException { + public boolean processRow(Result rowResult) throws IOException { HRegionInfo info = Writables.getHRegionInfo( - rowResult.get(HConstants.COL_REGIONINFO)); + rowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER)); if (!(Bytes.equals(info.getTableDesc().getName(), getTableName()))) { return false; } HServerAddress server = new HServerAddress(); - Cell c = rowResult.get(HConstants.COL_SERVER); - if (c != null && c.getValue() != null && c.getValue().length > 0) { - String address = Bytes.toString(c.getValue()); + byte [] value = rowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + if (value != null && value.length > 0) { + String address = Bytes.toString(value); server = new HServerAddress(address); } @@ -307,12 +320,347 @@ } /** + * Return the row that matches row exactly, + * or the one that immediately preceeds it. + * + * @param regionName region name + * @param row row key + * @param family Column family to look for row in. + * @return map of values + * @throws IOException + * @since 0.20.0 + */ + public Result getRowOrBefore(final byte[] row, final byte[] family) + throws IOException { + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, row) { + public Result call() throws IOException { + return server.getClosestRowBefore(location.getRegionInfo().getRegionName(), + row, family); + } + }); + } + + /** + * Return the row that matches row exactly, + * or the one that immediately preceeds it. + * + * @param regionName region name + * @param row row key + * @param family Column family to look for row in. + * @return map of values + * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getRowOrBefore(byte[], byte[]} + */ + public RowResult getClosestRowBefore(final byte[] row, final byte[] family) + throws IOException { + Result r = getRowOrBefore(row, family); + return r == null || r.isEmpty()? null: r.getRowResult(); + } + + /** + * Get a scanner on the current table as specified by the {@link Scan} object + * + * @param scan a configured {@link Scan} object + * @return scanner + * @throws IOException + * @since 0.20.0 + */ + public ResultScanner getScanner(final Scan scan) throws IOException { + ClientScanner s = new ClientScanner(scan); + s.initialize(); + return s; + } + /** + * Get a scanner on the current table as specified by the {@link Scan} object + * + * @param family + * @return + * @throws IOException + * @since 0.20.0 + */ + public ResultScanner getScanner(byte [] family) throws IOException { + Scan scan = new Scan(); + scan.addFamily(family); + return getScanner(scan); + } + + /** + * Get a scanner on the current table as specified by the {@link Scan} object + * + * @param family + * @param qualifier + * @return + * @throws IOException + * @since 0.20.0 + */ + public ResultScanner getScanner(byte [] family, byte [] qualifier) + throws IOException { + Scan scan = new Scan(); + scan.addColumn(family, qualifier); + return getScanner(scan); + } + + /** + * Method for getting data from a row + * @param get the Get to fetch + * @return the result + * @throws IOException + * @since 0.20.0 + */ + public Result get(final Get get) throws IOException { + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, get.getRow()) { + public Result call() throws IOException { + return server.get(location.getRegionInfo().getRegionName(), get); + } + } + ); + } + + /** + * + * @param delete + * @throws IOException + * @since 0.20.0 + */ + public void delete(final Delete delete) + throws IOException { + connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, delete.getRow()) { + public Boolean call() throws IOException { + System.out.println("IN HT.get.ServerCallable,"); + server.delete(location.getRegionInfo().getRegionName(), delete); + return null; + } + } + ); + } + + /** + * Commit a Put to the table. + *

+ * If autoFlush is false, the update is buffered. + * @param put + * @throws IOException + * @since 0.20.0 + */ + public synchronized void put(final Put put) throws IOException { + validatePut(put); + writeBuffer.add(put); + currentWriteBufferSize += put.heapSize(); + if(autoFlush || currentWriteBufferSize > writeBufferSize) { + flushCommits(); + } + } + + /** + * Commit a List of Puts to the table. + *

+ * If autoFlush is false, the update is buffered. + * @param puts + * @throws IOException + * @since 0.20.0 + */ + public synchronized void put(final List puts) throws IOException { + for(Put put : puts) { + validatePut(put); + writeBuffer.add(put); + currentWriteBufferSize += put.heapSize(); + } + if(autoFlush || currentWriteBufferSize > writeBufferSize) { + flushCommits(); + } + } + + /** + * Atomically increments a column value. If the column value isn't long-like, + * this could throw an exception. + * + * @param row + * @param family + * @param qualifier + * @param amount + * @return + * @throws IOException + */ + public long incrementColumnValue(final byte [] row, final byte [] family, + final byte [] qualifier, final long amount) + throws IOException { + NullPointerException npe = null; + if (row == null) { + npe = new NullPointerException("row is null"); + } else if (family == null) { + npe = new NullPointerException("column is null"); + } + if (npe != null) { + IOException io = new IOException( + "Invalid arguments to incrementColumnValue", npe); + throw io; + } + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, row) { + public Long call() throws IOException { + Get get = new Get(row); + get.addColumn(family, qualifier); + return server.incrementColumnValue( + location.getRegionInfo().getRegionName(), row, family, + qualifier, amount); + } + } + ); + } + + /** + * Atomically checks if a row/family/qualifier value match the expectedValue. + * If it does, it adds the put. + * + * @param row + * @param family + * @param qualifier + * @param value the expected value + * @param put + * @throws IOException + * @return true if the new put was execute, false otherwise + */ + public synchronized boolean checkAndPut(final byte [] row, + final byte [] family, final byte [] qualifier, final byte [] value, + final Put put) + throws IOException { + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, row) { + public Boolean call() throws IOException { + return server.checkAndPut(location.getRegionInfo().getRegionName(), + row, family, qualifier, value, put)? Boolean.TRUE: Boolean.FALSE; + } + } + ).booleanValue(); + } + + + /** + * Commit to the table the buffer of BatchUpdate. + * Called automatically in the commit methods when autoFlush is true. + * @throws IOException + */ + public void flushCommits() throws IOException { + try { + connection.processBatchOfRows(writeBuffer, tableName); + } finally { + currentWriteBufferSize = 0; + writeBuffer.clear(); + } + } + + /** + * Release held resources + * + * @throws IOException + */ + public void close() throws IOException{ + flushCommits(); + } + + /** + * Utility method that verifies Put is well formed. + * @param put + * @throws IllegalArgumentException + * @throws IOException + */ + private void validatePut(final Put put) throws IllegalArgumentException{ + if(put.isEmpty()) { + throw new IllegalArgumentException("No columns to insert"); + } + } + + /** + * Obtain a row lock + * @param row The row to lock + * @return rowLock RowLock containing row and lock id + * @throws IOException + */ + public RowLock lockRow(final byte [] row) + throws IOException { + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, row) { + public RowLock call() throws IOException { + long lockId = + server.lockRow(location.getRegionInfo().getRegionName(), row); + RowLock rowLock = new RowLock(row,lockId); + return rowLock; + } + } + ); + } + + /** + * Release a row lock + * @param rl The row lock to release + * @throws IOException + */ + public void unlockRow(final RowLock rl) + throws IOException { + connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, rl.getRow()) { + public Boolean call() throws IOException { + server.unlockRow(location.getRegionInfo().getRegionName(), + rl.getLockId()); + return null; + } + } + ); + } + + /** + * Get the value of autoFlush. If true, updates will not be buffered + * @return value of autoFlush + */ + public boolean isAutoFlush() { + return autoFlush; + } + + /** + * Set if this instanciation of HTable will autoFlush + * @param autoFlush + */ + public void setAutoFlush(boolean autoFlush) { + this.autoFlush = autoFlush; + } + + /** + * Get the maximum size in bytes of the write buffer for this HTable + * @return the size of the write buffer in bytes + */ + public long getWriteBufferSize() { + return writeBufferSize; + } + + /** + * Set the size of the buffer in bytes + * @param writeBufferSize + */ + public void setWriteBufferSize(long writeBufferSize) { + this.writeBufferSize = writeBufferSize; + } + + /** + * Get the write buffer + * @return the current write buffer + */ + public ArrayList getWriteBuffer() { + return writeBuffer; + } + + // Old API. Pre-hbase-880, hbase-1304. + + /** * Get a single value for the specified row and column * * @param row row key * @param column column name * @return value for specified row/column * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public Cell get(final String row, final String column) throws IOException { @@ -327,6 +675,7 @@ * @param numVersions - number of versions to retrieve * @return value for specified row/column * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public Cell [] get(final String row, final String column, int numVersions) throws IOException { @@ -340,18 +689,15 @@ * @param column column name * @return value for specified row/column * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public Cell get(final byte [] row, final byte [] column) throws IOException { - return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { - public Cell call() throws IOException { - Cell[] result = server.get(location.getRegionInfo().getRegionName(), - row, column, -1, -1); - return (result == null)? null : result[0]; - } - } - ); + Get g = new Get(row); + byte [][] fq = KeyValue.parseColumn(column); + g.addColumn(fq[0], fq[1]); + Result r = get(g); + return r == null || r.size() <= 0? null: r.getCellValue(); } /** @@ -361,18 +707,12 @@ * @param numVersions number of versions to retrieve * @return Array of Cells. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public Cell [] get(final byte [] row, final byte [] column, - final int numVersions) + final int numVersions) throws IOException { - return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { - public Cell[] call() throws IOException { - return server.get(location.getRegionInfo().getRegionName(), row, - column, -1, numVersions); - } - } - ); + return get(row, column, HConstants.LATEST_TIMESTAMP, numVersions); } /** @@ -385,6 +725,7 @@ * @param numVersions - number of versions to retrieve * @return - array of values that match the above criteria * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public Cell[] get(final String row, final String column, final long timestamp, final int numVersions) @@ -402,28 +743,24 @@ * @param numVersions - number of versions to retrieve * @return - array of values that match the above criteria * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public Cell[] get(final byte [] row, final byte [] column, final long timestamp, final int numVersions) throws IOException { - Cell[] values = null; - values = connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { - public Cell[] call() throws IOException { - return server.get(location.getRegionInfo().getRegionName(), row, - column, timestamp, numVersions); - } - } - ); - - if (values != null) { - ArrayList cellValues = new ArrayList(); - for (int i = 0 ; i < values.length; i++) { - cellValues.add(values[i]); - } - return cellValues.toArray(new Cell[values.length]); + Get g = new Get(row); + byte [][] fq = KeyValue.parseColumn(column); + if (fq[1].length == 0) { + g.addFamily(fq[0]); + } else { + g.addColumn(fq[0], fq[1]); + } + g.setMaxVersions(numVersions); + if (timestamp != HConstants.LATEST_TIMESTAMP) { + g.setTimeStamp(timestamp); } - return null; + Result r = get(g); + return r == null || r.size() <= 0? null: r.getCellValues(); } /** @@ -432,6 +769,7 @@ * @param row row key * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final String row) throws IOException { return getRow(Bytes.toBytes(row)); @@ -443,6 +781,7 @@ * @param row row key * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final byte [] row) throws IOException { return getRow(row, HConstants.LATEST_TIMESTAMP); @@ -455,6 +794,7 @@ * @param numVersions number of versions to return * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final String row, final int numVersions) throws IOException { @@ -469,6 +809,7 @@ * @param numVersions number of versions to return * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final byte[] row, final int numVersions) throws IOException { @@ -495,6 +836,7 @@ * @param ts timestamp * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final byte [] row, final long ts) throws IOException { @@ -515,6 +857,7 @@ * @param numVersions number of versions to return * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final byte[] row, final long timestamp, final int numVersions) throws IOException { @@ -528,6 +871,7 @@ * @param columns Array of column names and families you want to retrieve. * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final String row, final String [] columns) throws IOException { @@ -541,6 +885,7 @@ * @param columns Array of column names and families you want to retrieve. * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final byte [] row, final byte [][] columns) throws IOException { @@ -555,6 +900,7 @@ * @param numVersions number of versions to return * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final String row, final String[] columns, final int numVersions) throws IOException { @@ -570,6 +916,7 @@ * @param numVersions number of versions to return * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final byte[] row, final byte[][] columns, final int numVersions) throws IOException { @@ -584,6 +931,7 @@ * @param ts timestamp * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final String row, final String [] columns, final long ts) @@ -599,6 +947,7 @@ * @param ts timestamp * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final byte [] row, final byte [][] columns, final long ts) @@ -625,36 +974,29 @@ * @param rl row lock * @return RowResult is null if row does not exist. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #get(Get)} */ public RowResult getRow(final byte [] row, final byte [][] columns, final long ts, final int numVersions, final RowLock rl) - throws IOException { - return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { - public RowResult call() throws IOException { - long lockId = -1L; - if(rl != null) { - lockId = rl.getLockId(); - } - return server.getRow(location.getRegionInfo().getRegionName(), row, - columns, ts, numVersions, lockId); - } - } - ); - } - - public RowResult getClosestRowBefore(final byte[] row, final byte[] columnFamily) throws IOException { - return connection.getRegionServerWithRetries( - new ServerCallable(connection,tableName,row) { - public RowResult call() throws IOException { - return server.getClosestRowBefore( - location.getRegionInfo().getRegionName(), row, columnFamily - ); - } + Get g = rl != null? new Get(row, rl): new Get(row); + if (columns != null) { + for (int i = 0; i < columns.length; i++) { + byte[][] splits = KeyValue.parseColumn(columns[i]); + if (splits[1].length == 0) { + g.addFamily(splits[0]); + } else { + g.addColumn(splits[0], splits[1]); } - ); + } } + g.setMaxVersions(numVersions); + if (ts != HConstants.LATEST_TIMESTAMP) { + g.setTimeStamp(ts); + } + Result r = get(g); + return r == null || r.size() <= 0? null: r.getRowResult(); + } /** * Get a scanner on the current table starting at first row. @@ -667,6 +1009,7 @@ * \+|^&*$[]]}{)(. * @return scanner * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getScanner(Scan)} */ public Scanner getScanner(final String [] columns) throws IOException { @@ -685,6 +1028,7 @@ * @param startRow starting row in table to scan * @return scanner * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getScanner(Scan)} */ public Scanner getScanner(final String [] columns, final String startRow) throws IOException { @@ -702,6 +1046,7 @@ * \+|^&*$[]]}{)(. * @return scanner * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getScanner(Scan)} */ public Scanner getScanner(final byte[][] columns) throws IOException { @@ -721,6 +1066,7 @@ * @param startRow starting row in table to scan * @return scanner * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getScanner(Scan)} */ public Scanner getScanner(final byte[][] columns, final byte [] startRow) throws IOException { @@ -740,6 +1086,7 @@ * @param timestamp only return results whose timestamp <= this value * @return scanner * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getScanner(Scan)} */ public Scanner getScanner(final byte[][] columns, final byte [] startRow, long timestamp) @@ -760,6 +1107,7 @@ * @param filter a row filter using row-key regexp and/or column data filter. * @return scanner * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getScanner(Scan)} */ public Scanner getScanner(final byte[][] columns, final byte [] startRow, RowFilterInterface filter) @@ -783,12 +1131,12 @@ * stopRow itself. * @return scanner * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getScanner(Scan)} */ public Scanner getScanner(final byte [][] columns, final byte [] startRow, final byte [] stopRow) throws IOException { - return getScanner(columns, startRow, stopRow, - HConstants.LATEST_TIMESTAMP); + return getScanner(columns, startRow, stopRow, HConstants.LATEST_TIMESTAMP); } /** @@ -808,6 +1156,7 @@ * @param timestamp only return results whose timestamp <= this value * @return scanner * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getScanner(Scan)} */ public Scanner getScanner(final String [] columns, final String startRow, final String stopRow, final long timestamp) @@ -833,6 +1182,7 @@ * @param timestamp only return results whose timestamp <= this value * @return scanner * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getScanner(Scan)} */ public Scanner getScanner(final byte [][] columns, final byte [] startRow, final byte [] stopRow, final long timestamp) @@ -855,6 +1205,7 @@ * @param filter a row filter using row-key regexp and/or column data filter. * @return scanner * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getScanner(Scan)} */ public Scanner getScanner(String[] columns, String startRow, long timestamp, RowFilterInterface filter) @@ -877,16 +1228,30 @@ * @param filter a row filter using row-key regexp and/or column data filter. * @return scanner * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #getScanner(Scan)} */ public Scanner getScanner(final byte [][] columns, final byte [] startRow, long timestamp, RowFilterInterface filter) throws IOException { - ClientScanner s = new ClientScanner(columns, startRow, - timestamp, filter); + // Convert old-style filter to new. We only do a few types at moment. + // If a whilematchrowfilter and it has a stoprowfilter, handle that. + Scan scan = filter == null? new Scan(startRow): + filter instanceof WhileMatchRowFilter && ((WhileMatchRowFilter)filter).getInternalFilter() instanceof StopRowFilter? + new Scan(startRow, ((StopRowFilter)((WhileMatchRowFilter)filter).getInternalFilter()).getStopRowKey()): + null /*new UnsupportedOperationException("Not handled yet")*/; + for (int i = 0; i < columns.length; i++) { + byte [][] splits = KeyValue.parseColumn(columns[i]); + if (splits[1].length == 0) { + scan.addFamily(splits[0]); + } else { + scan.addColumn(splits[0], splits[1]); + } + } + OldClientScanner s = new OldClientScanner(new ClientScanner(scan)); s.initialize(); return s; } - + /** * Completely delete the row's cells. * @@ -913,6 +1278,7 @@ * @param row Key of the row you want to completely delete. * @param column column to be deleted * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteAll(final byte [] row, final byte [] column) throws IOException { @@ -925,6 +1291,7 @@ * @param row Key of the row you want to completely delete. * @param ts Delete all cells of the same timestamp or older. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteAll(final byte [] row, final long ts) throws IOException { @@ -937,6 +1304,7 @@ * @param row Key of the row you want to completely delete. * @param ts Delete all cells of the same timestamp or older. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteAll(final String row, final long ts) throws IOException { @@ -948,6 +1316,7 @@ * @param row Row to update * @param column name of column whose value is to be deleted * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteAll(final String row, final String column) throws IOException { @@ -961,6 +1330,7 @@ * @param column name of column whose value is to be deleted * @param ts Delete all cells of the same timestamp or older. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteAll(final String row, final String column, final long ts) throws IOException { @@ -975,6 +1345,7 @@ * @param column name of column whose value is to be deleted * @param ts Delete all cells of the same timestamp or older. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteAll(final byte [] row, final byte [] column, final long ts) throws IOException { @@ -990,28 +1361,14 @@ * @param ts Delete all cells of the same timestamp or older. * @param rl Existing row lock * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteAll(final byte [] row, final byte [] column, final long ts, final RowLock rl) throws IOException { - connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { - public Boolean call() throws IOException { - long lockId = -1L; - if(rl != null) { - lockId = rl.getLockId(); - } - if (column != null) { - this.server.deleteAll(location.getRegionInfo().getRegionName(), - row, column, ts, lockId); - } else { - this.server.deleteAll(location.getRegionInfo().getRegionName(), - row, ts, lockId); - } - return null; - } - } - ); + Delete d = new Delete(row, ts, rl); + d.deleteColumn(column); + delete(d); } /** @@ -1019,6 +1376,7 @@ * @param row Row to update * @param colRegex column regex expression * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteAllByRegex(final String row, final String colRegex) throws IOException { @@ -1032,6 +1390,7 @@ * @param colRegex Column Regex expression * @param ts Delete all cells of the same timestamp or older. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteAllByRegex(final String row, final String colRegex, final long ts) throws IOException { @@ -1045,6 +1404,7 @@ * @param colRegex Column Regex expression * @param ts Delete all cells of the same timestamp or older. * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteAllByRegex(final byte [] row, final String colRegex, final long ts) throws IOException { @@ -1060,23 +1420,12 @@ * @param ts Delete all cells of the same timestamp or older. * @param rl Existing row lock * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteAllByRegex(final byte [] row, final String colRegex, final long ts, final RowLock rl) throws IOException { - connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { - public Boolean call() throws IOException { - long lockId = -1L; - if(rl != null) { - lockId = rl.getLockId(); - } - this.server.deleteAllByRegex(location.getRegionInfo().getRegionName(), - row, colRegex, ts, lockId); - return null; - } - } - ); + throw new UnsupportedOperationException("TODO: Not yet implemented"); } /** @@ -1085,6 +1434,7 @@ * @param row The row to operate on * @param family The column family to match * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteFamily(final String row, final String family) throws IOException { @@ -1097,6 +1447,7 @@ * @param row The row to operate on * @param family The column family to match * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteFamily(final byte[] row, final byte[] family) throws IOException { @@ -1111,6 +1462,7 @@ * @param family The column family to match * @param timestamp Timestamp to match * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteFamily(final String row, final String family, final long timestamp) @@ -1126,6 +1478,7 @@ * @param family The column family to match * @param timestamp Timestamp to match * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteFamily(final byte [] row, final byte [] family, final long timestamp) @@ -1142,23 +1495,15 @@ * @param timestamp Timestamp to match * @param rl Existing row lock * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteFamily(final byte [] row, final byte [] family, final long timestamp, final RowLock rl) throws IOException { - connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { - public Boolean call() throws IOException { - long lockId = -1L; - if(rl != null) { - lockId = rl.getLockId(); - } - server.deleteFamily(location.getRegionInfo().getRegionName(), row, - family, timestamp, lockId); - return null; - } - } - ); + // Is this right? LATEST_TS? St.Ack + Delete d = new Delete(row, HConstants.LATEST_TIMESTAMP, rl); + d.deleteFamily(family); + delete(d); } /** @@ -1168,6 +1513,7 @@ * @param row The row to operate on * @param familyRegex Column family regex * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteFamilyByRegex(final String row, final String familyRegex) throws IOException { @@ -1181,6 +1527,7 @@ * @param row The row to operate on * @param familyRegex Column family regex * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteFamilyByRegex(final byte[] row, final String familyRegex) throws IOException { @@ -1195,6 +1542,7 @@ * @param familyRegex Column family regex * @param timestamp Timestamp to match * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteFamilyByRegex(final String row, final String familyRegex, final long timestamp) @@ -1210,6 +1558,7 @@ * @param familyRegex Column family regex * @param timestamp Timestamp to match * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteFamilyByRegex(final byte [] row, final String familyRegex, final long timestamp) @@ -1227,22 +1576,12 @@ * @param timestamp Timestamp to match * @param r1 Existing row lock * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} */ public void deleteFamilyByRegex(final byte[] row, final String familyRegex, - final long timestamp, final RowLock r1) throws IOException { - connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { - public Boolean call() throws IOException { - long lockId = -1L; - if(r1 != null) { - lockId = r1.getLockId(); - } - server.deleteFamilyByRegex(location.getRegionInfo().getRegionName(), - row, familyRegex, timestamp, lockId); - return null; - } - } - ); + final long timestamp, final RowLock r1) + throws IOException { + throw new UnsupportedOperationException("TODO: Not yet implemented"); } /** @@ -1295,16 +1634,14 @@ */ public boolean exists(final byte [] row, final byte [] column, final long timestamp, final RowLock rl) throws IOException { + final Get g = new Get(row, rl); + g.addColumn(column); + g.setTimeStamp(timestamp); return connection.getRegionServerWithRetries( new ServerCallable(connection, tableName, row) { public Boolean call() throws IOException { - long lockId = -1L; - if (rl != null) { - lockId = rl.getLockId(); - } return Boolean.valueOf(server. - exists(location.getRegionInfo().getRegionName(), row, - column, timestamp, lockId)); + exists(location.getRegionInfo().getRegionName(), g)); } } ).booleanValue(); @@ -1315,10 +1652,12 @@ * If autoFlush is false, the update is buffered * @param batchUpdate * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} or + * {@link #put(Put) */ public synchronized void commit(final BatchUpdate batchUpdate) throws IOException { - commit(batchUpdate,null); + commit(batchUpdate, null); } /** @@ -1327,36 +1666,33 @@ * @param batchUpdate * @param rl Existing row lock * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(Delete)} or + * {@link #put(Put) */ public synchronized void commit(final BatchUpdate batchUpdate, final RowLock rl) throws IOException { - checkRowAndColumns(batchUpdate); - if(rl != null) { - batchUpdate.setRowLock(rl.getLockId()); - } - writeBuffer.add(batchUpdate); - currentWriteBufferSize += batchUpdate.heapSize(); - if (autoFlush || currentWriteBufferSize > writeBufferSize) { - flushCommits(); + for (BatchOperation bo: batchUpdate) { + if (!bo.isPut()) throw new IOException("Only Puts in BU as of 0.20.0"); + Put p = new Put(batchUpdate.getRow(), rl); + p.add(bo.getColumn(),batchUpdate.getTimestamp(), bo.getValue()); + put(p); } } - + /** * Commit a List of BatchUpdate to the table. * If autoFlush is false, the updates are buffered * @param batchUpdates * @throws IOException + * @deprecated As of hbase 0.20.0, replaced by {@link #delete(List)} or + * {@link #put(List) */ public synchronized void commit(final List batchUpdates) throws IOException { + // Am I breaking something here in old API by doing this? for (BatchUpdate bu : batchUpdates) { - checkRowAndColumns(bu); - writeBuffer.add(bu); - currentWriteBufferSize += bu.heapSize(); - } - if (autoFlush || currentWriteBufferSize > writeBufferSize) { - flushCommits(); + commit(bu); } } @@ -1372,150 +1708,7 @@ public synchronized boolean checkAndSave(final BatchUpdate batchUpdate, final HbaseMapWritable expectedValues, final RowLock rl) throws IOException { - checkRowAndColumns(batchUpdate); - if(rl != null) { - batchUpdate.setRowLock(rl.getLockId()); - } - return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, batchUpdate.getRow()) { - public Boolean call() throws IOException { - return server.checkAndSave(location.getRegionInfo().getRegionName(), - batchUpdate, expectedValues)? - Boolean.TRUE: Boolean.FALSE; - } - } - ).booleanValue(); - } - - /** - * Commit to the table the buffer of BatchUpdate. - * Called automaticaly in the commit methods when autoFlush is true. - * @throws IOException - */ - public void flushCommits() throws IOException { - try { - connection.processBatchOfRows(writeBuffer, tableName); - } finally { - currentWriteBufferSize = 0; - writeBuffer.clear(); - } - } - - /** - * Release held resources - * - * @throws IOException - */ - public void close() throws IOException{ - flushCommits(); - } - - /** - * Utility method that checks rows existence, length and columns well - * formedness. - * - * @param bu - * @throws IllegalArgumentException - * @throws IOException - */ - private void checkRowAndColumns(BatchUpdate bu) - throws IllegalArgumentException, IOException { - if (bu.getRow() == null || bu.getRow().length > HConstants.MAX_ROW_LENGTH) { - throw new IllegalArgumentException("Row key is invalid"); - } - for (BatchOperation bo : bu) { - HStoreKey.getFamily(bo.getColumn()); - } - } - - /** - * Obtain a row lock - * @param row The row to lock - * @return rowLock RowLock containing row and lock id - * @throws IOException - */ - public RowLock lockRow(final byte [] row) - throws IOException { - return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { - public RowLock call() throws IOException { - long lockId = - server.lockRow(location.getRegionInfo().getRegionName(), row); - RowLock rowLock = new RowLock(row,lockId); - return rowLock; - } - } - ); - } - - /** - * Release a row lock - * @param rl The row lock to release - * @throws IOException - */ - public void unlockRow(final RowLock rl) - throws IOException { - connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, rl.getRow()) { - public Boolean call() throws IOException { - server.unlockRow(location.getRegionInfo().getRegionName(), - rl.getLockId()); - return null; - } - } - ); - } - - /** - * Get the value of autoFlush. If true, updates will not be buffered - * @return value of autoFlush - */ - public boolean isAutoFlush() { - return autoFlush; - } - - /** - * Set if this instanciation of HTable will autoFlush - * @param autoFlush - */ - public void setAutoFlush(boolean autoFlush) { - this.autoFlush = autoFlush; - } - - /** - * Get the maximum size in bytes of the write buffer for this HTable - * @return the size of the write buffer in bytes - */ - public long getWriteBufferSize() { - return writeBufferSize; - } - - /** - * Set the size of the buffer in bytes - * @param writeBufferSize - */ - public void setWriteBufferSize(long writeBufferSize) { - this.writeBufferSize = writeBufferSize; - } - - /** - * Get the write buffer - * @return the current write buffer - */ - public ArrayList getWriteBuffer() { - return writeBuffer; - } - - public long incrementColumnValue(final byte [] row, final byte [] column, - final long amount) throws IOException { - return connection.getRegionServerWithRetries( - new ServerCallable(connection, tableName, row) { - public Long call() throws IOException { - return server.incrementColumnValue( - location.getRegionInfo().getRegionName(), row, column, amount); - } - } - ); + throw new UnsupportedOperationException("TODO: Not yet implemented"); } /** @@ -1523,59 +1716,45 @@ * If there are multiple regions in a table, this scanner will iterate * through them all. */ - protected class ClientScanner implements Scanner { + protected class ClientScanner implements ResultScanner { private final Log CLIENT_LOG = LogFactory.getLog(this.getClass()); - private byte[][] columns; - private byte [] startRow; - protected long scanTime; + private Scan scan; private boolean closed = false; private HRegionInfo currentRegion = null; private ScannerCallable callable = null; - protected RowFilterInterface filter; - private final LinkedList cache = new LinkedList(); - @SuppressWarnings("hiding") + private final LinkedList cache = new LinkedList(); private final int scannerCaching = HTable.this.scannerCaching; private long lastNext; - - protected ClientScanner(final byte[][] columns, final byte [] startRow, - final long timestamp, final RowFilterInterface filter) { + + protected ClientScanner(final Scan scan) { if (CLIENT_LOG.isDebugEnabled()) { CLIENT_LOG.debug("Creating scanner over " + Bytes.toString(getTableName()) - + " starting at key '" + Bytes.toString(startRow) + "'"); - } - // save off the simple parameters - this.columns = columns; - this.startRow = startRow; - this.scanTime = timestamp; - - // save the filter, and make sure that the filter applies to the data - // we're expecting to pull back - this.filter = filter; - if (filter != null) { - filter.validate(columns); + + " starting at key '" + Bytes.toString(scan.getStartRow()) + "'"); } + this.scan = scan; this.lastNext = System.currentTimeMillis(); + + // Removed filter validation. We have a new format now, only one of all + // the current filters has a validate() method. We can add it back, + // need to decide on what we're going to do re: filter redesign. + // Need, at the least, to break up family from qualifier as separate + // checks, I think it's important server-side filters are optimal in that + // respect. } - //TODO: change visibility to protected - - public void initialize() throws IOException { + protected void initialize() throws IOException { nextScanner(this.scannerCaching); } - - protected byte[][] getColumns() { - return columns; + + protected Scan getScan() { + return scan; } protected long getTimestamp() { - return scanTime; + return lastNext; } - protected RowFilterInterface getFilter() { - return filter; - } - /* * Gets a scanner for the next region. * Returns false if there are no more scanners. @@ -1603,9 +1782,10 @@ return false; } } - + HRegionInfo oldRegion = this.currentRegion; - byte [] localStartKey = oldRegion == null? startRow: oldRegion.getEndKey(); + byte [] localStartKey = + oldRegion == null ? scan.getStartRow() : oldRegion.getEndKey(); if (CLIENT_LOG.isDebugEnabled()) { CLIENT_LOG.debug("Advancing internal scanner to startKey at '" + @@ -1628,8 +1808,7 @@ protected ScannerCallable getScannerCallable(byte [] localStartKey, int nbRows) { ScannerCallable s = new ScannerCallable(getConnection(), - getTableName(), columns, - localStartKey, scanTime, filter); + getTableName(), localStartKey, scan); s.setCaching(nbRows); return s; } @@ -1640,22 +1819,22 @@ * filter. */ private boolean filterSaysStop(final byte [] endKey) { - if (this.filter == null) { + if(!scan.hasFilter()) { return false; } // Let the filter see current row. - this.filter.filterRowKey(endKey, 0, endKey.length); - return this.filter.filterAllRemaining(); + scan.getFilter().filterRowKey(endKey, 0, endKey.length); + return scan.getFilter().filterAllRemaining(); } - public RowResult next() throws IOException { + public Result next() throws IOException { // If the scanner is closed but there is some rows left in the cache, // it will first empty it before returning null if (cache.size() == 0 && this.closed) { return null; } if (cache.size() == 0) { - RowResult[] values = null; + Result [] values = null; int countdown = this.scannerCaching; // We need to reset it if it's a new callable that was created // with a countdown in nextScanner @@ -1674,7 +1853,7 @@ } lastNext = System.currentTimeMillis(); if (values != null && values.length > 0) { - for (RowResult rs : values) { + for (Result rs : values) { cache.add(rs); countdown--; } @@ -1693,18 +1872,18 @@ * @return Between zero and nbRows RowResults * @throws IOException */ - public RowResult[] next(int nbRows) throws IOException { + public Result [] next(int nbRows) throws IOException { // Collect values to be returned here - ArrayList resultSets = new ArrayList(nbRows); + ArrayList resultSets = new ArrayList(nbRows); for(int i = 0; i < nbRows; i++) { - RowResult next = next(); + Result next = next(); if (next != null) { resultSets.add(next); } else { break; } } - return resultSets.toArray(new RowResult[resultSets.size()]); + return resultSets.toArray(new Result[resultSets.size()]); } public void close() { @@ -1723,6 +1902,88 @@ closed = true; } + public Iterator iterator() { + return new Iterator() { + // The next RowResult, possibly pre-read + Result next = null; + + // return true if there is another item pending, false if there isn't. + // this method is where the actual advancing takes place, but you need + // to call next() to consume it. hasNext() will only advance if there + // isn't a pending next(). + public boolean hasNext() { + if (next == null) { + try { + next = ClientScanner.this.next(); + return next != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return true; + } + + // get the pending next item and advance the iterator. returns null if + // there is no next item. + public Result next() { + // since hasNext() does the real advancing, we call this to determine + // if there is a next before proceeding. + if (!hasNext()) { + return null; + } + + // if we get to here, then hasNext() has given us an item to return. + // we want to return the item and then null out the next pointer, so + // we use a temporary variable. + Result temp = next; + next = null; + return temp; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } + + /** + * {@link Scanner} implementation made on top of a {@link ResultScanner}. + */ + protected class OldClientScanner implements Scanner { + private final ClientScanner cs; + + OldClientScanner(final ClientScanner cs) { + this.cs = cs; + } + + protected void initialize() throws IOException { + this.cs.initialize(); + } + + @Override + public void close() { + this.cs.close(); + } + + @Override + public RowResult next() throws IOException { + Result r = this.cs.next(); + return r == null || r.isEmpty()? null: r.getRowResult(); + } + + @Override + public RowResult [] next(int nbRows) throws IOException { + Result [] rr = this.cs.next(nbRows); + if (rr == null || rr.length == 0) return null; + RowResult [] results = new RowResult[rr.length]; + for (int i = 0; i < rr.length; i++) { + results[i] = rr[i].getRowResult(); + } + return results; + } + + @Override public Iterator iterator() { return new Iterator() { // The next RowResult, possibly pre-read @@ -1735,7 +1996,7 @@ public boolean hasNext() { if (next == null) { try { - next = ClientScanner.this.next(); + next = OldClientScanner.this.next(); return next != null; } catch (IOException e) { throw new RuntimeException(e); @@ -1767,4 +2028,4 @@ }; } } -} +} \ No newline at end of file Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTablePool.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTablePool.java?rev=782178&r1=782177&r2=782178&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTablePool.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTablePool.java Sat Jun 6 01:26:21 2009 @@ -47,6 +47,7 @@ /** * Get a shared table pool. + * @param config * @param tableName the table name * @return the table pool */ Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=782178&r1=782177&r2=782178&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java Sat Jun 6 01:26:21 2009 @@ -5,7 +5,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; /** @@ -49,14 +48,14 @@ // Scan over each meta region ScannerCallable callable = null; do { - callable = new ScannerCallable(connection, META_TABLE_NAME, - COLUMN_FAMILY_ARRAY, startRow, LATEST_TIMESTAMP, null); + Scan scan = new Scan(startRow).addFamily(CATALOG_FAMILY); + callable = new ScannerCallable(connection, META_TABLE_NAME, scan.getStartRow(), scan); // Open scanner connection.getRegionServerWithRetries(callable); try { - RowResult r = null; + Result r = null; do { - RowResult [] rrs = connection.getRegionServerWithRetries(callable); + Result [] rrs = connection.getRegionServerWithRetries(callable); if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) { break; } @@ -85,6 +84,6 @@ * @return A boolean to know if it should continue to loop in the region * @throws IOException */ - public boolean processRow(RowResult rowResult) throws IOException; + public boolean processRow(Result rowResult) throws IOException; } } Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Put.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Put.java?rev=782178&view=auto ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Put.java (added) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Put.java Sat Jun 6 01:26:21 2009 @@ -0,0 +1,305 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.io.Writable; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.Bytes; + + +/** + * Used to perform Put operations for a single row. + *

+ * To perform a Put, instantiate a Put object with the row to insert to and + * for each column to be inserted, execute {@link #add(byte[], byte[], byte[]) add} or + * {@link #add(byte[], byte[], long, byte[]) add} if setting the timestamp. + */ +public class Put implements HeapSize, Writable, Comparable { + private byte [] row = null; + private long timestamp = HConstants.LATEST_TIMESTAMP; + private long lockId = -1L; + private Map> familyMap = + new TreeMap>(Bytes.BYTES_COMPARATOR); + + /** Constructor for Writable. DO NOT USE */ + public Put() {} + + /** + * Create a Put operation for the specified row. + * @param row row key + */ + public Put(byte [] row) { + this(row, null); + } + + /** + * Create a Put operation for the specified row, using an existing row lock. + * @param row row key + * @param rowLock previously acquired row lock, or null + */ + public Put(byte [] row, RowLock rowLock) { + if(row == null || row.length > HConstants.MAX_ROW_LENGTH) { + throw new IllegalArgumentException("Row key is invalid"); + } + this.row = row; + if(rowLock != null) { + this.lockId = rowLock.getLockId(); + } + } + + /** + * Copy constructor. Creates a Put operation cloned from the specified Put. + * @param putToCopy put to copy + */ + public Put(Put putToCopy) { + this(putToCopy.getRow(), putToCopy.getRowLock()); + this.familyMap = + new TreeMap>(Bytes.BYTES_COMPARATOR); + for(Map.Entry> entry : + putToCopy.getFamilyMap().entrySet()) { + this.familyMap.put(entry.getKey(), entry.getValue()); + } + } + + /** + * Add the specified column and value to this Put operation. + * @param family family name + * @param qualifier column qualifier + * @param value column value + */ + public void add(byte [] family, byte [] qualifier, byte [] value) { + add(family, qualifier, this.timestamp, value); + } + + /** + * Add the specified column and value, with the specified timestamp as + * its version to this Put operation. + * @param column Old style column name with family and qualifier put together + * with a colon. + * @param timestamp version timestamp + * @param value column value + */ + public void add(byte [] column, long timestamp, byte [] value) { + byte [][] parts = KeyValue.parseColumn(column); + add(parts[0], parts[1], timestamp, value); + } + + /** + * Add the specified column and value, with the specified timestamp as + * its version to this Put operation. + * @param family family name + * @param qualifier column qualifier + * @param timestamp version timestamp + * @param value column value + */ + public void add(byte [] family, byte [] qualifier, long timestamp, byte [] value) { + List list = familyMap.get(family); + if(list == null) { + list = new ArrayList(); + } + KeyValue kv = new KeyValue(this.row, family, qualifier, timestamp, + KeyValue.Type.Put, value); + list.add(kv); + familyMap.put(family, list); + } + + /** + * Add the specified KeyValue to this Put operation. + * @param kv + */ + public void add(KeyValue kv) { + byte [] family = kv.getFamily(); + List list = familyMap.get(family); + if(list == null) { + list = new ArrayList(); + } + list.add(kv); + familyMap.put(family, list); + } + + + /** + * Method for retrieving the put's familyMap + * @return familyMap + */ + public Map> getFamilyMap() { + return this.familyMap; + } + + /** + * Method for retrieving the put's row + * @return row + */ + public byte [] getRow() { + return this.row; + } + + /** + * Method for retrieving the put's RowLock + * @return RowLock + */ + public RowLock getRowLock() { + return new RowLock(this.row, this.lockId); + } + + /** + * Method for retrieving the put's lockId + * @return lockId + */ + public long getLockId() { + return this.lockId; + } + + /** + * Method to check if the familyMap is empty + * @return true if empty, false otherwise + */ + public boolean isEmpty() { + return familyMap.isEmpty(); + } + + /** + * Method for setting the timestamp + * @param timestamp + */ + public void setTimeStamp(long timestamp) { + this.timestamp = timestamp; + } + + public int numFamilies() { + return familyMap.size(); + } + + public int size() { + int size = 0; + for(List kvList : this.familyMap.values()) { + size += kvList.size(); + } + return size; + } + + /** + * @return String + */ + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("row="); + sb.append(Bytes.toString(this.row)); + sb.append(", families={"); + boolean moreThanOne = false; + for(Map.Entry> entry : this.familyMap.entrySet()) { + if(moreThanOne) { + sb.append(", "); + } else { + moreThanOne = true; + } + sb.append("(family="); + sb.append(Bytes.toString(entry.getKey())); + sb.append(", keyvalues=("); + boolean moreThanOneB = false; + for(KeyValue kv : entry.getValue()) { + if(moreThanOneB) { + sb.append(", "); + } else { + moreThanOneB = true; + } + sb.append(kv.toString()); + } + sb.append(")"); + } + sb.append("}"); + return sb.toString(); + } + + public int compareTo(Put p) { + return Bytes.compareTo(this.getRow(), p.getRow()); + } + + //HeapSize + public long heapSize() { + long totalSize = 0; + for(Map.Entry> entry : this.familyMap.entrySet()) { + for(KeyValue kv : entry.getValue()) { + totalSize += kv.heapSize(); + } + } + return totalSize; + } + + //Writable + public void readFields(final DataInput in) + throws IOException { + this.row = Bytes.readByteArray(in); + this.timestamp = in.readLong(); + this.lockId = in.readLong(); + int numFamilies = in.readInt(); + this.familyMap = + new TreeMap>(Bytes.BYTES_COMPARATOR); + for(int i=0;i keys = new ArrayList(numKeys); + int totalLen = in.readInt(); + byte [] buf = new byte[totalLen]; + int offset = 0; + for(int j=0;j> entry : familyMap.entrySet()) { + Bytes.writeByteArray(out, entry.getKey()); + List keys = entry.getValue(); + out.writeInt(keys.size()); + int totalLen = 0; + for(KeyValue kv : keys) { + totalLen += kv.getLength(); + } + out.writeInt(totalLen); + for(KeyValue kv : keys) { + out.writeInt(kv.getLength()); + out.write(kv.getBuffer(), kv.getOffset(), kv.getLength()); + } + } + } +}