Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 27483 invoked from network); 8 Dec 2007 06:55:07 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Dec 2007 06:55:07 -0000 Received: (qmail 91454 invoked by uid 500); 8 Dec 2007 06:54:55 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 91427 invoked by uid 500); 8 Dec 2007 06:54:55 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 91418 invoked by uid 99); 8 Dec 2007 06:54:55 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Dec 2007 22:54:55 -0800 X-ASF-Spam-Status: No, hits=-98.0 required=10.0 tests=ALL_TRUSTED,URIBL_BLACK X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Dec 2007 06:54:56 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id ED27B1A9838; Fri, 7 Dec 2007 22:54:33 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r602334 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/ Date: Sat, 08 Dec 2007 06:54:33 -0000 To: hadoop-commits@lucene.apache.org From: jimk@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071208065433.ED27B1A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jimk Date: Fri Dec 7 22:54:31 2007 New Revision: 602334 URL: http://svn.apache.org/viewvc?rev=602334&view=rev Log: HADOOP-2350 Scanner api returns null row names, or skips row names if different column families do not have entries for some rows Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=602334&r1=602333&r2=602334&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Fri Dec 7 22:54:31 2007 @@ -64,6 +64,8 @@ HADOOP-2338 Fix NullPointerException in master server. HADOOP-2380 REST servlet throws NPE when any value node has an empty string (Bryan Duxbury via Stack) + HADOOP-2350 Scanner api returns null row names, or skips row names if + different column families do not have entries for some rows IMPROVEMENTS HADOOP-2401 Add convenience put method that takes writable Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java?rev=602334&r1=602333&r2=602334&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java Fri Dec 7 22:54:31 2007 @@ -332,7 +332,7 @@ HRegion root = new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null, null); - HInternalScannerInterface rootScanner = + HScannerInterface rootScanner = root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null); try { Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=602334&r1=602333&r2=602334&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Fri Dec 7 22:54:31 2007 @@ -1088,7 +1088,7 @@ * @return HScannerInterface * @throws IOException */ - public HInternalScannerInterface getScanner(Text[] cols, Text firstRow, + public HScannerInterface getScanner(Text[] cols, Text firstRow, long timestamp, RowFilterInterface filter) throws IOException { lock.readLock().lock(); try { @@ -1485,33 +1485,21 @@ /** * HScanner is an iterator through a bunch of rows in an HRegion. */ - private class HScanner implements HInternalScannerInterface { + private class HScanner implements HScannerInterface { private HInternalScannerInterface[] scanners; - private boolean wildcardMatch = false; - private boolean multipleMatchers = false; + private TreeMap[] resultSets; + private HStoreKey[] keys; /** Create an HScanner with a handle on many HStores. */ @SuppressWarnings("unchecked") HScanner(Text[] cols, Text firstRow, long timestamp, HStore[] stores, RowFilterInterface filter) throws IOException { - this.scanners = new HInternalScannerInterface[stores.length]; -// Advance to the first key in each store. -// All results will match the required column-set and scanTime. - + this.scanners = new HInternalScannerInterface[stores.length]; try { for (int i = 0; i < stores.length; i++) { - HInternalScannerInterface scanner = - scanners[i] = - stores[i].getScanner(timestamp, cols, firstRow, filter); - - if (scanner.isWildcardScanner()) { - this.wildcardMatch = true; - } - if (scanner.isMultipleMatchScanner()) { - this.multipleMatchers = true; - } - } + scanners[i] = stores[i].getScanner(timestamp, cols, firstRow, filter); + } } catch(IOException e) { for (int i = 0; i < this.scanners.length; i++) { @@ -1521,35 +1509,100 @@ } throw e; } + +// Advance to the first key in each store. +// All results will match the required column-set and scanTime. + + this.resultSets = new TreeMap[scanners.length]; + this.keys = new HStoreKey[scanners.length]; + for (int i = 0; i < scanners.length; i++) { + keys[i] = new HStoreKey(); + resultSets[i] = new TreeMap(); + if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) { + closeScanner(i); + } + } + // As we have now successfully completed initialization, increment the // activeScanner count. activeScannerCount.incrementAndGet(); } - /** @return true if the scanner is a wild card scanner */ - public boolean isWildcardScanner() { - return wildcardMatch; - } - - /** @return true if the scanner is a multiple match scanner */ - public boolean isMultipleMatchScanner() { - return multipleMatchers; - } - /** {@inheritDoc} */ public boolean next(HStoreKey key, SortedMap results) throws IOException { - boolean haveResults = false; + boolean moreToFollow = false; + + // Find the lowest-possible key. + + Text chosenRow = null; + long chosenTimestamp = -1; + for (int i = 0; i < this.keys.length; i++) { + if (scanners[i] != null && + (chosenRow == null || + (keys[i].getRow().compareTo(chosenRow) < 0) || + ((keys[i].getRow().compareTo(chosenRow) == 0) && + (keys[i].getTimestamp() > chosenTimestamp)))) { + chosenRow = new Text(keys[i].getRow()); + chosenTimestamp = keys[i].getTimestamp(); + } + } + + // Store the key and results for each sub-scanner. Merge them as + // appropriate. + if (chosenTimestamp >= 0) { + // Here we are setting the passed in key with current row+timestamp + key.setRow(chosenRow); + key.setVersion(chosenTimestamp); + key.setColumn(HConstants.EMPTY_TEXT); + + for (int i = 0; i < scanners.length; i++) { + if (scanners[i] != null && keys[i].getRow().compareTo(chosenRow) == 0) { + // NOTE: We used to do results.putAll(resultSets[i]); + // but this had the effect of overwriting newer + // values with older ones. So now we only insert + // a result if the map does not contain the key. + for (Map.Entry e : resultSets[i].entrySet()) { + if (!results.containsKey(e.getKey())) { + results.put(e.getKey(), e.getValue()); + } + } + resultSets[i].clear(); + if (!scanners[i].next(keys[i], resultSets[i])) { + closeScanner(i); + } + } + } + } + for (int i = 0; i < scanners.length; i++) { - if (scanners[i] != null) { - if (scanners[i].next(key, results)) { - haveResults = true; - } else { + // If the current scanner is non-null AND has a lower-or-equal + // row label, then its timestamp is bad. We need to advance it. + while ((scanners[i] != null) && + (keys[i].getRow().compareTo(chosenRow) <= 0)) { + + resultSets[i].clear(); + if (!scanners[i].next(keys[i], resultSets[i])) { + closeScanner(i); + } + } + } + + moreToFollow = chosenTimestamp >= 0; + if (results == null || results.size() <= 0) { + // If we got no results, then there is no more to follow. + moreToFollow = false; + } + + // Make sure scanners closed if no more results + if (!moreToFollow) { + for (int i = 0; i < scanners.length; i++) { + if (null != scanners[i]) { closeScanner(i); } } } - return haveResults; + return moreToFollow; } @@ -1563,6 +1616,8 @@ } } finally { scanners[i] = null; + resultSets[i] = null; + keys[i] = null; } } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=602334&r1=602333&r2=602334&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Fri Dec 7 22:54:31 2007 @@ -1373,7 +1373,7 @@ requestCount.incrementAndGet(); try { String scannerName = String.valueOf(scannerId); - HInternalScannerInterface s = scanners.get(scannerName); + HScannerInterface s = scanners.get(scannerName); if (s == null) { throw new UnknownScannerException("Name: " + scannerName); } @@ -1433,7 +1433,7 @@ try { HRegion r = getRegion(regionName); long scannerId = -1L; - HInternalScannerInterface s = + HScannerInterface s = r.getScanner(cols, firstRow, timestamp, filter); scannerId = rand.nextLong(); String scannerName = String.valueOf(scannerId); @@ -1457,7 +1457,7 @@ requestCount.incrementAndGet(); try { String scannerName = String.valueOf(scannerId); - HInternalScannerInterface s = null; + HScannerInterface s = null; synchronized(scanners) { s = scanners.remove(scannerName); } @@ -1472,9 +1472,8 @@ } } - Map scanners = - Collections.synchronizedMap(new HashMap()); + Map scanners = + Collections.synchronizedMap(new HashMap()); /** * Instantiated as a scanner lease. @@ -1490,7 +1489,7 @@ /** {@inheritDoc} */ public void leaseExpired() { LOG.info("Scanner " + this.scannerName + " lease expired"); - HInternalScannerInterface s = null; + HScannerInterface s = null; synchronized(scanners) { s = scanners.remove(this.scannerName); } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java?rev=602334&r1=602333&r2=602334&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java Fri Dec 7 22:54:31 2007 @@ -283,7 +283,7 @@ startTime = System.currentTimeMillis(); - HInternalScannerInterface s = + HScannerInterface s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null); int numFetched = 0; try { @@ -630,7 +630,7 @@ long startTime = System.currentTimeMillis(); - HInternalScannerInterface s = + HScannerInterface s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null); try { Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java?rev=602334&r1=602333&r2=602334&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java Fri Dec 7 22:54:31 2007 @@ -69,7 +69,7 @@ private void scan(boolean validateStartcode, String serverName) throws IOException { - HInternalScannerInterface scanner = null; + HScannerInterface scanner = null; TreeMap results = new TreeMap(); HStoreKey key = new HStoreKey(); @@ -108,7 +108,7 @@ } } finally { - HInternalScannerInterface s = scanner; + HScannerInterface s = scanner; scanner = null; if(s != null) { s.close(); Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java?rev=602334&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java Fri Dec 7 22:54:31 2007 @@ -0,0 +1,161 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.hadoop.io.Text; + +/** test the scanner API at all levels */ +public class TestScannerAPI extends HBaseClusterTestCase { + private final Text[] columns = new Text[] { + new Text("a:"), + new Text("b:") + }; + private final Text startRow = new Text("0"); + + private final TreeMap> values = + new TreeMap>(); + + /** + * @throws Exception + */ + public TestScannerAPI() throws Exception { + super(); + try { + TreeMap columns = new TreeMap(); + columns.put(new Text("a:1"), "1".getBytes(HConstants.UTF8_ENCODING)); + values.put(new Text("1"), columns); + columns = new TreeMap(); + columns.put(new Text("a:2"), "2".getBytes(HConstants.UTF8_ENCODING)); + columns.put(new Text("b:2"), "2".getBytes(HConstants.UTF8_ENCODING)); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + /** + * @throws IOException + */ + public void testApi() throws IOException { + final String tableName = getName(); + + // Create table + + HBaseAdmin admin = new HBaseAdmin(conf); + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + for (int i = 0; i < columns.length; i++) { + tableDesc.addFamily(new HColumnDescriptor(columns[i].toString())); + } + admin.createTable(tableDesc); + + // Insert values + + HTable table = new HTable(conf, new Text(getName())); + + for (Map.Entry> row: values.entrySet()) { + long lockid = table.startUpdate(row.getKey()); + for (Map.Entry val: row.getValue().entrySet()) { + table.put(lockid, val.getKey(), val.getValue()); + } + table.commit(lockid); + } + + HRegion region = null; + try { + SortedMap regions = + cluster.getRegionThreads().get(0).getRegionServer().getOnlineRegions(); + for (Map.Entry e: regions.entrySet()) { + if (!e.getValue().getRegionInfo().isMetaRegion()) { + region = e.getValue(); + } + } + } catch (Exception e) { + e.printStackTrace(); + IOException iox = new IOException("error finding region"); + iox.initCause(e); + throw iox; + } + @SuppressWarnings("null") + HScannerInterface scanner = + region.getScanner(columns, startRow, System.currentTimeMillis(), null); + try { + verify(scanner); + } finally { + scanner.close(); + } + + scanner = table.obtainScanner(columns, startRow); + try { + verify(scanner); + } finally { + scanner.close(); + } + scanner = table.obtainScanner(columns, startRow); + try { + for (Iterator>> iterator = + scanner.iterator(); + iterator.hasNext(); + ) { + Map.Entry> row = iterator.next(); + HStoreKey key = row.getKey(); + assertTrue("row key", values.containsKey(key.getRow())); + + SortedMap results = row.getValue(); + SortedMap columnValues = values.get(key.getRow()); + assertEquals(columnValues.size(), results.size()); + for (Map.Entry e: columnValues.entrySet()) { + Text column = e.getKey(); + assertTrue("column", results.containsKey(column)); + assertTrue("value", Arrays.equals(columnValues.get(column), + results.get(column))); + } + } + } finally { + scanner.close(); + } + } + + private void verify(HScannerInterface scanner) throws IOException { + HStoreKey key = new HStoreKey(); + SortedMap results = new TreeMap(); + while (scanner.next(key, results)) { + Text row = key.getRow(); + assertTrue("row key", values.containsKey(row)); + + SortedMap columnValues = values.get(row); + assertEquals(columnValues.size(), results.size()); + for (Map.Entry e: columnValues.entrySet()) { + Text column = e.getKey(); + assertTrue("column", results.containsKey(column)); + assertTrue("value", Arrays.equals(columnValues.get(column), + results.get(column))); + } + results.clear(); + } + } +} Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java?rev=602334&r1=602333&r2=602334&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java Fri Dec 7 22:54:31 2007 @@ -228,7 +228,7 @@ final Text firstValue) throws IOException { Text [] cols = new Text[] {new Text(column)}; - HInternalScannerInterface s = r.getScanner(cols, + HScannerInterface s = r.getScanner(cols, HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null); try { HStoreKey curKey = new HStoreKey();